You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/10/16 01:39:25 UTC

[iotdb] 01/01: Merge branch 'master' into load_v2

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ffe7f4e15d78e0bd5db8ca7a1674cb0f8f2170d9
Merge: e7562e8a60b ba77da8a75a
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Oct 16 09:43:33 2023 +0800

    Merge branch 'master' into load_v2
    
    # Conflicts:
    #       iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
    #       iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
    #       iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/mlnode/MLNodeInfo.java

 .github/workflows/client-go.yml                    |    1 +
 .github/workflows/client-python.yml                |    4 +
 .github/workflows/cluster-it-1c1d.yml              |    1 +
 .github/workflows/cluster-it-1c3d.yml              |    1 +
 .github/workflows/grafana-plugin.yml               |    5 +
 .github/workflows/iotdb-ml.yml                     |    1 +
 .github/workflows/pipe-it-2cluster.yml             |    1 +
 .../{sonar-coveralls.yml => sonar-codecov.yml}     |   22 +-
 .github/workflows/unit-test.yml                    |    1 +
 .gitignore                                         |   12 +-
 iotdb-core/mlnode/README.md => .mvn/extensions.xml |   42 +-
 .mvn/gradle-enterprise.xml                         |   48 +
 Jenkinsfile                                        |    6 +-
 LICENSE                                            |   21 +-
 NOTICE                                             |    2 +-
 NOTICE-binary                                      |    2 +-
 README.md                                          |    4 +-
 checkstyle.xml                                     |    4 +-
 code-coverage/pom.xml                              |    2 -
 codecov.yml                                        |    6 +-
 distribution/pom.xml                               |   13 +-
 .../src/assembly/flink-sql-connector.xml           |   48 +-
 example/client-cpp-example/pom.xml                 |    1 -
 example/flink-sql/pom.xml                          |   99 ++
 .../org/apache/iotdb/example/BatchSinkExample.java |   74 ++
 .../apache/iotdb/example/BoundedScanExample.java   |   52 +
 .../java/org/apache/iotdb/example/CDCExample.java  |   54 +
 .../org/apache/iotdb/example/LookupExample.java    |   77 ++
 .../apache/iotdb/example/StreamingSinkExample.java |   68 ++
 ....apache.iotdb.db.protocol.mqtt.PayloadFormatter |   19 +
 example/pom.xml                                    |    1 +
 example/rest-java-example/pom.xml                  |    4 -
 example/session/pom.xml                            |    5 -
 example/trigger/pom.xml                            |    2 +-
 example/udf/pom.xml                                |    2 +-
 integration-test/checkstyle.xml                    |    2 +-
 integration-test/import-control.xml                |  194 ++-
 integration-test/pom.xml                           |  163 ++-
 .../iotdb/it/env/cluster/ClusterConstant.java      |    2 +
 .../it/env/cluster/config/MppCommonConfig.java     |   13 +
 .../env/cluster/config/MppSharedCommonConfig.java  |   14 +
 .../iotdb/it/env/cluster/env/AbstractEnv.java      |   60 +-
 .../iotdb/it/env/cluster/env/Cluster1Env.java      |    6 +
 .../iotdb/it/env/cluster/env/MultiClusterEnv.java  |    6 +
 .../apache/iotdb/it/env/cluster/env/SimpleEnv.java |    6 +
 .../iotdb/it/env/cluster/node/DataNodeWrapper.java |    9 +
 .../it/env/remote/config/RemoteCommonConfig.java   |   10 +
 .../iotdb/it/env/remote/env/RemoteServerEnv.java   |   13 +
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   11 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |    4 +
 .../apache/iotdb/cli/it/StartClientScriptIT.java   |   32 -
 .../it/cluster/IoTDBClusterNodeErrorStartUpIT.java |   27 -
 .../it/cluster/IoTDBClusterRestartIT.java          |   29 +-
 .../it/database/IoTDBDatabaseSetAndDeleteIT.java   |   23 +-
 .../it/partition/IoTDBPartitionGetterIT.java       |    8 +-
 .../confignode/it/utils/ConfigNodeTestUtils.java   |    7 -
 .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java  |  110 +-
 .../apache/iotdb/db/it/IoTDBDatetimeFormatIT.java  |    1 +
 .../apache/iotdb/db/it/IoTDBInsertMultiRowIT.java  |   11 +
 .../apache/iotdb/db/it/IoTDBInsertWithQueryIT.java |   23 +-
 .../iotdb/db/it/IoTDBPartialInsertionIT.java       |   71 ++
 .../apache/iotdb/db/it/{ => auth}/IoTDBAuthIT.java |  529 ++++++--
 .../it => db/it/auth}/IoTDBClusterAuthorityIT.java |  208 ++--
 .../iotdb/db/it/auth/IoTDBSeriesPermissionIT.java  |  305 +++++
 .../iotdb/db/it/auth/IoTDBSystemPermissionIT.java  |  329 +++++
 .../db/it/auth/IoTDBTemplatePermissionIT.java      |  161 +++
 .../java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java  |   88 --
 .../iotdb/db/it/schema/IoTDBMetadataFetchIT.java   |    8 +-
 .../iotdb/db/it/schema/IoTDBSchemaTemplateIT.java  |   29 +-
 .../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java  |    9 +-
 .../db/it/strangepath/IoTDBStrangePathIT.java      |   78 ++
 .../db/it/trigger/IoTDBTriggerManagementIT.java    |  117 +-
 .../iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java     |   17 +-
 .../org/apache/iotdb/db/it/utils/TestUtils.java    |  211 +++-
 .../org/apache/iotdb/flink/it/AbstractTest.java    |   46 +
 .../java/org/apache/iotdb/flink/it/SinkTest.java   |  163 +++
 .../java/org/apache/iotdb/flink/it/SourceTest.java |  192 +++
 .../test/java/org/apache/iotdb/flink/it/Utils.java |   93 ++
 .../apache/iotdb/pipe/it/IoTDBPipeClusterIT.java   |  766 ++++++++++++
 ...ncIT.java => IoTDBPipeConnectorParallelIT.java} |   56 +-
 .../apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java  |   64 +-
 .../apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java |  645 ++++++++++
 .../apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java  |  415 +++++++
 .../iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java     |  328 +++++
 .../apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java    |  667 +++++++++++
 .../pipe/it/extractor/IoTDBPipeExtractorIT.java    |  605 ++++++++++
 .../iotdb/session/it/IoTDBSessionComplexIT.java    |  163 +++
 .../iotdb/session/it/IoTDBSessionSimpleIT.java     |   81 ++
 iotdb-api/external-api/pom.xml                     |    2 +-
 iotdb-api/pipe-api/pom.xml                         |    2 +-
 .../org/apache/iotdb/pipe/api/PipeConnector.java   |   10 +-
 .../org/apache/iotdb/pipe/api/PipeProcessor.java   |   10 +-
 .../event/dml/insertion/TsFileInsertionEvent.java  |    2 +-
 iotdb-api/trigger-api/pom.xml                      |    2 +-
 iotdb-api/udf-api/pom.xml                          |    2 +-
 iotdb-client/cli/pom.xml                           |    2 -
 .../cli/src/assembly/resources/sbin/start-cli.sh   |    2 +-
 .../java/org/apache/iotdb/cli/AbstractCli.java     |   56 +-
 .../src/main/java/org/apache/iotdb/cli/Cli.java    |    4 -
 .../java/org/apache/iotdb/tool/ExportTsFile.java   |   14 +-
 .../main/java/org/apache/iotdb/tool/ImportCsv.java |    2 +-
 .../java/org/apache/iotdb/cli/AbstractCliIT.java   |   39 -
 iotdb-client/client-cpp/pom.xml                    |    7 +-
 iotdb-client/client-py/.gitignore                  |    2 +
 iotdb-client/client-py/SessionPoolExample.py       |   21 +-
 iotdb-client/client-py/iotdb/IoTDBContainer.py     |    9 +-
 iotdb-client/client-py/iotdb/Session.py            |  113 +-
 iotdb-client/client-py/iotdb/SessionPool.py        |   60 +-
 .../iotdb/tsfile/utils/ReadWriteIOUtils.py         |   12 +-
 iotdb-client/client-py/iotdb/utils/Field.py        |  148 +--
 .../client-py/iotdb/utils/IoTDBConstants.py        |   33 +-
 .../client-py/iotdb/utils/IoTDBRpcDataSet.py       |  299 +++--
 iotdb-client/client-py/iotdb/utils/NumpyTablet.py  |    8 +-
 .../client-py/iotdb/utils/SessionDataSet.py        |   90 +-
 iotdb-client/client-py/iotdb/utils/Tablet.py       |   14 +-
 iotdb-client/client-py/pom.xml                     |   62 +-
 iotdb-client/client-py/release.sh                  |   19 +-
 iotdb-client/client-py/requirements_dev.txt        |    2 +-
 iotdb-client/client-py/{ => resources}/setup.py    |    2 +-
 iotdb-client/client-py/tests/test_session.py       |   60 +-
 iotdb-client/client-py/tests/test_session_pool.py  |   33 +-
 iotdb-client/client-py/tests/test_tablet.py        |   18 +-
 iotdb-client/jdbc/pom.xml                          |    4 +-
 iotdb-client/service-rpc/pom.xml                   |    2 +-
 .../org/apache/iotdb/rpc/RpcTransportFactory.java  |   25 +
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |   29 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |    6 +-
 .../java/org/apache/iotdb/rpc/RpcUtilsTest.java    |   67 ++
 iotdb-client/session/pom.xml                       |    2 +-
 .../iotdb/session/SessionConnectionTest.java       |  388 ++++++
 .../java/org/apache/iotdb/session/SessionTest.java | 1175 ++++++++++++++++++
 .../session/pool/SessionPoolExceptionTest.java     |  266 ++++
 .../apache/iotdb/session/pool/SessionPoolTest.java | 1264 +++++++++++++++++++-
 .../iotdb/session/util/SessionUtilsTest.java       |  176 +++
 iotdb-connector/flink-sql-iotdb-connector/pom.xml  |   33 +-
 .../org/apache/iotdb/flink/sql/common/Options.java |    8 +-
 .../org/apache/iotdb/flink/sql/common/Utils.java   |   38 +-
 .../sql/factory/IoTDBDynamicTableFactory.java      |   96 +-
 .../sql/function/IoTDBBoundedScanFunction.java     |   50 +-
 .../flink/sql/function/IoTDBCDCSourceFunction.java |   93 +-
 .../flink/sql/function/IoTDBLookupFunction.java    |   40 +-
 .../flink/sql/function/IoTDBSinkFunction.java      |   74 +-
 iotdb-connector/grafana-connector/pom.xml          |   20 +-
 iotdb-connector/grafana-plugin/backend-compile.sh  |    2 +-
 iotdb-connector/grafana-plugin/go.mod              |   85 +-
 .../grafana-plugin/pkg/plugin/plugin.go            |    2 +-
 iotdb-connector/grafana-plugin/pom.xml             |   11 +-
 iotdb-connector/grafana-plugin/yarn.lock           |   16 +-
 iotdb-connector/hadoop/pom.xml                     |    4 +-
 .../apache/iotdb/hadoop/fileSystem/HDFSOutput.java |    5 +
 iotdb-connector/hive-connector/pom.xml             |    4 +-
 iotdb-connector/spark-iotdb-connector/pom.xml      |    3 -
 iotdb-connector/spark-tsfile/pom.xml               |    1 -
 iotdb-connector/zeppelin-interpreter/pom.xml       |   14 +-
 iotdb-core/antlr/pom.xml                           |    2 -
 .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 |    9 +-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |   60 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   49 +-
 iotdb-core/confignode/pom.xml                      |   22 +-
 .../resources/conf/iotdb-confignode.properties     |    2 +-
 .../client/async/AsyncDataNodeClientPool.java      |   45 +-
 .../client/sync/SyncConfigNodeClientPool.java      |    3 -
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   44 +-
 .../confignode/conf/ConfigNodeDescriptor.java      |   48 +-
 .../confignode/conf/SystemPropertiesUtils.java     |    5 +-
 .../consensus/request/ConfigPhysicalPlan.java      |   10 +-
 .../consensus/request/ConfigPhysicalPlanType.java  |   60 +-
 .../consensus/request/auth/AuthorPlan.java         |   19 +
 .../request/read/database/CountDatabasePlan.java   |   15 +-
 .../request/read/database/GetDatabasePlan.java     |    5 +-
 .../{ShowTrailPlan.java => GetModelInfoPlan.java}  |   38 +-
 .../{ShowTrailPlan.java => ShowTrialPlan.java}     |   38 +-
 .../read/partition/GetNodePathsPartitionPlan.java  |   12 +
 .../read/template/GetPathsSetTemplatePlan.java     |   11 +-
 .../request/write/model/UpdateModelInfoPlan.java   |   16 +-
 .../request/write/model/UpdateModelStatePlan.java  |   18 +-
 .../response/auth/PermissionInfoResp.java          |   40 +-
 .../GetModelInfoResp.java}                         |   36 +-
 .../response/{ => model}/ModelTableResp.java       |    2 +-
 .../TrialTableResp.java}                           |   28 +-
 .../statemachine/ConfigRegionStateMachine.java     |   65 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  157 ++-
 .../apache/iotdb/confignode/manager/IManager.java  |   33 +-
 .../iotdb/confignode/manager/ModelManager.java     |   99 +-
 .../confignode/manager/PermissionManager.java      |   56 +-
 .../iotdb/confignode/manager/ProcedureManager.java |   29 +-
 .../manager/consensus/ConsensusManager.java        |   28 +-
 .../iotdb/confignode/manager/cq/CQManager.java     |    9 -
 .../manager/load/balancer/RouteBalancer.java       |    2 +-
 .../iotdb/confignode/manager/node/NodeManager.java |   17 +-
 .../manager/partition/PartitionManager.java        |    5 +-
 .../manager/partition/PartitionMetrics.java        |  250 ++--
 .../manager/pipe/runtime/PipeHeartbeatParser.java  |   27 +-
 .../pipe/runtime/PipeHeartbeatScheduler.java       |    8 +-
 .../manager/pipe/runtime/PipeMetaSyncer.java       |   12 +-
 .../manager/pipe/task/PipeTaskCoordinator.java     |   35 +-
 .../manager/pipe/task/PipeTaskCoordinatorLock.java |  103 ++
 .../manager/schema/ClusterSchemaManager.java       |   32 +-
 .../iotdb/confignode/persistence/AuthorInfo.java   |  417 +++++--
 .../iotdb/confignode/persistence/ModelInfo.java    |  148 ++-
 .../iotdb/confignode/persistence/cq/CQInfo.java    |    1 +
 .../persistence/executor/ConfigPlanExecutor.java   |   28 +-
 .../confignode/persistence/node/NodeInfo.java      |    7 +-
 .../persistence/partition/PartitionInfo.java       |   29 +-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |    2 +-
 .../persistence/schema/ClusterSchemaInfo.java      |   27 +-
 .../confignode/persistence/schema/ConfigMTree.java |   41 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |   23 +-
 .../procedure/env/DataNodeRemoveHandler.java       |    2 +-
 .../procedure/impl/model/CreateModelProcedure.java |  125 +-
 .../procedure/impl/model/DropModelProcedure.java   |  131 +-
 .../impl/node/AddConfigNodeProcedure.java          |    9 +
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  |  176 ++-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |    2 +-
 .../impl/schema/DeleteDatabaseProcedure.java       |    2 +-
 .../impl/sync/AuthOperationProcedure.java          |  235 ++++
 .../AuthOperationProcedureState.java}              |   10 +-
 .../procedure/state/model/CreateModelState.java    |    2 -
 .../procedure/store/ProcedureFactory.java          |    6 +
 .../confignode/procedure/store/ProcedureType.java  |    5 +-
 .../confignode/procedure/store/ProcedureWAL.java   |    3 +
 .../iotdb/confignode/service/ConfigNode.java       |   65 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   89 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |   75 +-
 .../consensus/response/pipe/PipeTableRespTest.java |    6 +-
 .../confignode/persistence/AuthorInfoTest.java     |  974 ++++++++++++---
 .../iotdb/confignode/persistence/NodeInfoTest.java |    2 +-
 .../confignode/persistence/PartitionInfoTest.java  |    2 +-
 .../iotdb/confignode/persistence/PipeInfoTest.java |    4 +-
 .../persistence/schema/ClusterSchemaInfoTest.java  |    6 +-
 .../persistence/schema/ConfigMTreeTest.java        |   44 +-
 .../impl/sync/AuthOperationProcedureTest.java      |   98 ++
 iotdb-core/consensus/pom.xml                       |    2 +-
 .../org/apache/iotdb/consensus/IStateMachine.java  |    5 +
 .../apache/iotdb/consensus/config/RatisConfig.java |   50 +-
 .../consensus/iot/IoTConsensusServerImpl.java      |   64 +-
 .../consensus/iot/IoTConsensusServerMetrics.java   |  154 ++-
 .../consensus/iot/client/DispatchLogHandler.java   |    3 +-
 .../iot/client/IoTConsensusClientPool.java         |   11 +-
 .../consensus/iot/logdispatcher/LogDispatcher.java |    3 +-
 .../logdispatcher/LogDispatcherThreadMetrics.java  |   50 +-
 .../service/IoTConsensusRPCServiceProcessor.java   |    3 +
 .../ratis/ApplicationStateMachineProxy.java        |    5 +
 .../apache/iotdb/consensus/ratis/utils/Utils.java  |   28 +
 .../simple/SimpleConsensusServerImpl.java          |    1 +
 .../apache/iotdb/consensus/iot/ReplicateTest.java  |  129 +-
 .../consensus/ratis/{ => utils}/UtilsTest.java     |   32 +-
 iotdb-core/datanode/pom.xml                        |   26 +-
 .../resources/conf/iotdb-datanode.properties       |    2 +-
 .../check-overlap-sequence-files-and-repair.bat    |   62 +
 .../check-overlap-sequence-files-and-repair.sh     |   46 +-
 .../tools/tsfile/overlap-statistic-tool.bat        |   62 +
 .../tools/tsfile/overlap-statistic-tool.sh         |   46 +-
 .../org/apache/iotdb/db/audit/AuditLogger.java     |    7 +-
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |  384 +++---
 .../apache/iotdb/db/auth/AuthorizerManager.java    |  432 -------
 .../apache/iotdb/db/auth/BasicAuthorityCache.java  |   12 +-
 .../iotdb/db/auth/ClusterAuthorityFetcher.java     |  445 +++++--
 .../org/apache/iotdb/db/auth/IAuthorCache.java     |    2 +
 .../apache/iotdb/db/auth/IAuthorityFetcher.java    |   14 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   27 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |    5 +
 .../org/apache/iotdb/db/conf/IoTDBStartCheck.java  |   27 +-
 .../db/consensus/SchemaRegionConsensusImpl.java    |    4 +
 .../dataregion/DataExecutionVisitor.java           |    2 +-
 .../{ => runtime}/IntoProcessException.java        |    2 +-
 .../ModelInferenceProcessException.java}           |    6 +-
 .../{ => runtime}/WriteLockFailedException.java    |    2 +-
 .../pipe/agent/runtime/PipeCronEventInjector.java  |    2 +-
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |    5 +-
 .../SimpleConsensusProgressIndexAssigner.java      |   12 +-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |   89 +-
 .../config/constant/PipeConnectorConstant.java     |   16 +
 ...eResponse.java => AirGapELanguageConstant.java} |   14 +-
 .../payload/airgap/AirGapOneByteResponse.java      |    4 +
 ...oTDBThriftAsyncPipeTransferBatchReqBuilder.java |    7 +-
 ...IoTDBThriftSyncPipeTransferBatchReqBuilder.java |    3 +-
 .../protocol/airgap/IoTDBAirGapConnector.java      |   33 +-
 .../connector/protocol/opcua/OpcUaConnector.java   |  290 +++++
 .../protocol/opcua/OpcUaKeyStoreLoader.java        |  120 ++
 .../protocol/opcua/OpcUaServerBuilder.java         |  287 +++++
 .../thrift/async/IoTDBThriftAsyncConnector.java    |   19 +-
 .../thrift/sync/IoTDBThriftSyncConnector.java      |    4 +-
 ...ocketConnector.java => WebSocketConnector.java} |   67 +-
 .../websocket/WebSocketConnectorServer.java        |   84 +-
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  |   54 +-
 .../event/common/heartbeat/PipeHeartbeatEvent.java |  150 ++-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |    3 +-
 .../common/tablet/PipeRawTabletInsertionEvent.java |    4 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |   34 +-
 .../tsfile/TsFileInsertionDataContainer.java       |    3 +-
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  |   14 +-
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  123 +-
 .../realtime/PipeRealtimeDataRegionExtractor.java  |   49 +-
 .../PipeRealtimeDataRegionFakeExtractor.java       |    2 +-
 .../PipeRealtimeDataRegionHybridExtractor.java     |  208 ++--
 .../PipeRealtimeDataRegionLogExtractor.java        |   40 +-
 .../PipeRealtimeDataRegionTsFileExtractor.java     |   40 +-
 .../realtime/assigner/DisruptorQueue.java          |    6 +
 .../realtime/assigner/PipeDataRegionAssigner.java  |    2 +-
 .../pipe/extractor/realtime/epoch/TsFileEpoch.java |    3 +-
 .../realtime/epoch/TsFileEpochManager.java         |    4 +-
 .../matcher/CachedSchemaPatternMatcher.java        |    6 +-
 .../pipe/receiver/airgap/IoTDBAirGapReceiver.java  |   58 +-
 .../receiver/thrift/IoTDBThriftReceiverV1.java     |    5 +-
 .../resource/tsfile/PipeTsFileResourceManager.java |   16 +-
 .../db/pipe/resource/wal/PipeWALResource.java      |    2 +-
 .../pipe/resource/wal/PipeWALResourceManager.java  |    4 +
 .../apache/iotdb/db/pipe/task/PipeTaskManager.java |   30 +-
 .../pipe/task/connection/BlockingPendingQueue.java |   66 +-
 .../db/pipe/task/connection/EnrichedDeque.java     |   98 ++
 .../pipe/task/connection/PipeEventCollector.java   |   39 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |    7 +-
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    |   54 +-
 .../subtask/connector/PipeConnectorSubtask.java    |   59 +-
 .../connector/PipeConnectorSubtaskManager.java     |  138 ++-
 .../subtask/processor/PipeProcessorSubtask.java    |   77 +-
 .../iotdb/db/protocol/client/ConfigNodeClient.java |   75 +-
 .../iotdb/db/protocol/client/ConfigNodeInfo.java   |    2 +-
 .../db/protocol/client/DataNodeInternalClient.java |    2 +-
 .../db/protocol/mqtt/BrokerAuthenticator.java      |   14 +-
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java  |    2 +
 .../protocol/rest/filter/AuthorizationFilter.java  |    6 +-
 .../rest/handler/AuthorizationHandler.java         |   19 +-
 .../PingApiServiceImpl.java}                       |   41 +-
 .../v1/handler/StatementConstructionHandler.java   |   19 +-
 .../v2/handler/StatementConstructionHandler.java   |   20 +-
 .../iotdb/db/protocol/session/SessionManager.java  |    4 +-
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |   10 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   33 +-
 .../protocol/thrift/impl/MLNodeRPCServiceImpl.java |   14 +-
 .../common/header/ColumnHeaderConstant.java        |    9 +-
 .../common/header/DatasetHeaderFactory.java        |    4 +-
 .../execution/exchange/SharedTsBlockQueue.java     |    9 +
 .../fragment/FragmentInstanceManager.java          |   11 +
 .../execution/load/AlignedChunkData.java           |    4 +-
 .../execution/load/DataPartitionBatchFetcher.java  |    5 +-
 .../load/DeviceBatchTsFileDataManager.java         |    5 +-
 .../execution/load/LoadTsFileManager.java          |    7 +-
 .../execution/load/MergedTsFileSplitter.java       |   43 +-
 .../execution/load/NonAlignedChunkData.java        |    4 +-
 .../execution/load/TsFileDataManager.java          |    8 +-
 .../execution/load/TsFileSplitSender.java          |   54 +-
 .../queryengine/execution/load/TsFileSplitter.java |   20 +-
 .../nodesplit/ClusteringMeasurementSplitter.java   |  167 +--
 .../queryengine/execution/memory/MemoryPool.java   |    1 +
 .../execution/operator/AggregationUtil.java        |    2 +-
 .../process/AbstractConsumeAllOperator.java        |  101 +-
 .../operator/process/AbstractIntoOperator.java     |    2 +-
 .../operator/process/MergeSortOperator.java        |   61 +-
 .../process/join/RowBasedTimeJoinOperator.java     |   57 +-
 .../operator/process/last/LastQueryOperator.java   |    6 +-
 .../process/last/LastQuerySortOperator.java        |    4 +-
 .../process/last/LastQueryTransformOperator.java   |  120 ++
 .../operator/process/ml/ForecastOperator.java      |  241 ++++
 .../operator/schema/source/DeviceSchemaSource.java |   13 +-
 .../schema/source/LogicalViewSchemaSource.java     |   17 +-
 .../operator/schema/source/NodeSchemaSource.java   |   11 +-
 .../schema/source/PathsUsingTemplateSource.java    |    8 +-
 .../schema/source/SchemaSourceFactory.java         |   42 +-
 .../schema/source/TimeSeriesSchemaSource.java      |   14 +-
 .../db/queryengine/plan/analyze/Analysis.java      |   41 +-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  280 ++++-
 .../db/queryengine/plan/analyze/Analyzer.java      |   13 +-
 .../plan/analyze/ClusterPartitionFetcher.java      |   39 +-
 .../plan/analyze/ConcatPathRewriter.java           |   11 +-
 .../plan/analyze/ExpressionAnalyzer.java           |    7 +
 .../plan/analyze/ExpressionTypeAnalyzer.java       |    2 +
 .../plan/analyze/IModelFetcher.java}               |    9 +-
 .../plan/analyze/IPartitionFetcher.java            |   12 +-
 .../plan/analyze/LoadTsfileAnalyzer.java           |  126 +-
 .../db/queryengine/plan/analyze/ModelFetcher.java  |   75 ++
 .../queryengine/plan/analyze/SelectIntoUtils.java  |   10 +-
 .../analyze/cache/partition/PartitionCache.java    |   54 +-
 .../analyze/cache/schema/DataNodeSchemaCache.java  |   64 +-
 .../schema/DeviceUsingTemplateSchemaCache.java     |   18 +
 .../analyze/cache/schema/SchemaCacheEntry.java     |   16 +-
 .../cache/schema/TimeSeriesSchemaCache.java        |   71 +-
 .../cache/schema/dualkeycache/IDualKeyCache.java   |   22 +
 .../dualkeycache/impl/CacheEntryGroupImpl.java     |    6 +
 .../schema/dualkeycache/impl/DualKeyCacheImpl.java |  159 ++-
 .../dualkeycache/impl/FIFOCacheEntryManager.java   |   60 +-
 .../schema/dualkeycache/impl/ICacheEntry.java      |    2 +
 .../schema/dualkeycache/impl/ICacheEntryGroup.java |    4 +
 .../dualkeycache/impl/ICacheEntryManager.java      |    2 +
 .../dualkeycache/impl/LRUCacheEntryManager.java    |   86 +-
 .../schema/lastcache/DataNodeLastCacheManager.java |    6 +-
 .../schema/lastcache/ILastCacheContainer.java      |    2 +-
 .../analyze/schema/AutoCreateSchemaExecutor.java   |   40 +
 .../analyze/schema/ClusterSchemaFetchExecutor.java |   14 +-
 .../plan/execution/config/ConfigExecution.java     |   10 +-
 .../plan/execution/config/ConfigTaskVisitor.java   |  208 ++--
 .../config/executor/ClusterConfigTaskExecutor.java |  317 +++--
 .../config/executor/IConfigTaskExecutor.java       |   12 +-
 .../config/metadata/CreateContinuousQueryTask.java |   13 +-
 .../config/metadata/CreatePipePluginTask.java      |    2 +-
 .../config/metadata/DropPipePluginTask.java        |    2 +-
 .../config/metadata/ShowClusterDetailsTask.java    |   54 +-
 .../execution/config/metadata/ShowClusterTask.java |   30 +-
 .../config/metadata/model/CreateModelTask.java     |    7 +-
 .../config/metadata/model/ShowModelsTask.java      |   12 +-
 .../{ShowTrailsTask.java => ShowTrialsTask.java}   |   10 +-
 .../config/metadata/view/AlterLogicalViewTask.java |   10 +-
 .../plan/execution/config/sys/AuthorizerTask.java  |    7 +-
 .../plan/expression/ExpressionFactory.java         |    9 +
 .../plan/expression/multi/FunctionExpression.java  |   10 +
 .../plan/expression/multi/FunctionType.java        |    3 +-
 .../visitor/RemoveRootPrefixVisitor.java           |   45 +
 .../db/queryengine/plan/parser/ASTVisitor.java     |  174 ++-
 .../plan/parser/StatementGenerator.java            |   75 +-
 .../plan/planner/LogicalPlanBuilder.java           |  196 ++-
 .../plan/planner/LogicalPlanVisitor.java           |   67 +-
 .../plan/planner/OperatorTreeGenerator.java        |  107 +-
 .../plan/planner/SubPlanTypeExtractor.java         |   15 +
 .../distribution/DistributionPlanContext.java      |    5 +-
 .../planner/distribution/DistributionPlanner.java  |   10 +-
 .../planner/distribution/ExchangeNodeAdder.java    |   82 +-
 .../planner/distribution/NodeGroupContext.java     |   18 +-
 .../SimpleFragmentParallelPlanner.java             |    5 +-
 .../plan/planner/distribution/SourceRewriter.java  |   91 +-
 .../db/queryengine/plan/planner/plan/SubPlan.java  |    5 +-
 .../plan/planner/plan/node/PlanGraphPrinter.java   |   28 +
 .../plan/planner/plan/node/PlanNodeType.java       |    9 +-
 .../plan/planner/plan/node/PlanVisitor.java        |   10 +
 .../plan/node/load/LoadSingleTsFileNode.java       |    6 +-
 .../plan/node/metedata/read/DevicesCountNode.java  |   13 +-
 .../node/metedata/read/DevicesSchemaScanNode.java  |   13 +-
 .../metedata/read/LevelTimeSeriesCountNode.java    |   13 +-
 .../metedata/read/LogicalViewSchemaScanNode.java   |   17 +-
 .../metedata/read/NodePathsSchemaScanNode.java     |   12 +-
 .../metedata/read/PathsUsingTemplateScanNode.java  |   11 +-
 .../node/metedata/read/SchemaQueryScanNode.java    |   43 +-
 .../node/metedata/read/TimeSeriesCountNode.java    |   15 +-
 .../metedata/read/TimeSeriesSchemaScanNode.java    |   29 +-
 .../node/process/last/LastQueryCollectNode.java    |   17 +-
 .../plan/node/process/last/LastQueryMergeNode.java |   24 +-
 .../plan/node/process/last/LastQueryNode.java      |   27 +-
 ...ollectNode.java => LastQueryTransformNode.java} |   92 +-
 .../planner/plan/node/process/ml/ForecastNode.java |  122 ++
 .../planner/plan/node/sink/IdentitySinkNode.java   |    5 +
 .../plan/node/source/AlignedLastQueryScanNode.java |   20 +-
 .../plan/node/source/AlignedSeriesScanNode.java    |    4 +-
 .../plan/node/source/LastQueryScanNode.java        |   20 +-
 .../planner/plan/node/write/DeleteDataNode.java    |   24 +-
 .../plan/planner/plan/node/write/InsertNode.java   |    1 +
 .../planner/plan/node/write/InsertRowNode.java     |    6 +-
 .../planner/plan/node/write/InsertRowsNode.java    |    4 +-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |    4 +-
 .../planner/plan/node/write/InsertTabletNode.java  |   78 +-
 .../plan/node/write/PipeEnrichedInsertNode.java    |    1 +
 .../model/ForecastModelInferenceDescriptor.java    |  176 +++
 .../parameter/model/ModelInferenceDescriptor.java  |  111 ++
 .../plan/scheduler/load/LoadTsFileScheduler.java   |    8 +-
 .../statement/AuthorityInformationStatement.java   |   48 +
 .../db/queryengine/plan/statement/Statement.java   |   13 +-
 .../plan/statement/StatementVisitor.java           |   12 +-
 .../plan/statement/component/IntoComponent.java    |    2 +
 .../plan/statement/component/ResultColumn.java     |    5 +-
 .../plan/statement/component/SelectComponent.java  |   10 +-
 .../plan/statement/crud/DeleteDataStatement.java   |   17 +
 .../plan/statement/crud/InsertBaseStatement.java   |   17 +
 .../plan/statement/crud/InsertRowStatement.java    |    4 +-
 .../crud/InsertRowsOfOneDeviceStatement.java       |    5 +-
 .../plan/statement/crud/InsertStatement.java       |   17 +
 .../plan/statement/crud/InsertTabletStatement.java |   17 +-
 .../plan/statement/crud/LoadTsFileStatement.java   |    8 +
 .../crud/PipeEnrichedInsertBaseStatement.java      |    8 +-
 .../crud/PipeEnrichedLoadTsFileStatement.java      |    5 -
 .../plan/statement/crud/QueryStatement.java        |  100 +-
 .../InternalCreateMultiTimeSeriesStatement.java    |   25 +-
 .../InternalCreateTimeSeriesStatement.java         |   17 +
 .../metadata/AlterTimeSeriesStatement.java         |   15 +
 .../metadata/CreateAlignedTimeSeriesStatement.java |   17 +
 .../metadata/CreateContinuousQueryStatement.java   |   14 +
 .../metadata/CreateFunctionStatement.java          |   14 +
 .../metadata/CreateMultiTimeSeriesStatement.java   |   17 +
 .../metadata/CreateTimeSeriesStatement.java        |   15 +
 .../statement/metadata/CreateTriggerStatement.java |   14 +
 .../metadata/DatabaseSchemaStatement.java          |   14 +
 .../metadata/DeleteDatabaseStatement.java          |   14 +
 .../metadata/DeleteTimeSeriesStatement.java        |   17 +
 .../metadata/DropContinuousQueryStatement.java     |   14 +
 .../statement/metadata/DropFunctionStatement.java  |   14 +
 .../statement/metadata/DropTriggerStatement.java   |   32 +-
 .../plan/statement/metadata/SetTTLStatement.java   |   21 +-
 .../statement/metadata/ShowClusterStatement.java   |   14 +
 .../metadata/ShowConfigNodesStatement.java         |    9 +
 .../metadata/ShowContinuousQueriesStatement.java   |   14 +
 .../statement/metadata/ShowDataNodesStatement.java |    9 +
 .../statement/metadata/ShowFunctionsStatement.java |   14 +
 .../statement/metadata/ShowRegionStatement.java    |    9 +
 .../plan/statement/metadata/ShowStatement.java     |    4 +-
 .../statement/metadata/ShowTriggersStatement.java  |   14 +
 .../statement/metadata/ShowVariablesStatement.java |   14 +
 .../metadata/model/CreateModelStatement.java       |   71 +-
 ...ailsStatement.java => ShowTrialsStatement.java} |    6 +-
 .../{ => pipe}/CreatePipePluginStatement.java      |   16 +-
 .../metadata/pipe/CreatePipeStatement.java         |   14 +
 .../{ => pipe}/DropPipePluginStatement.java        |   16 +-
 .../statement/metadata/pipe/DropPipeStatement.java |   14 +
 .../{ => pipe}/ShowPipePluginsStatement.java       |   17 +-
 .../metadata/pipe/ShowPipesStatement.java          |   14 +
 .../metadata/pipe/StartPipeStatement.java          |   14 +
 .../statement/metadata/pipe/StopPipeStatement.java |   14 +
 .../template/ActivateTemplateStatement.java        |   30 +-
 .../template/BatchActivateTemplateStatement.java   |   36 +-
 .../template/DeactivateTemplateStatement.java      |   39 +-
 .../ShowNodesInSchemaTemplateStatement.java        |    9 +
 .../template/ShowPathSetTemplateStatement.java     |    9 +
 .../template/ShowSchemaTemplateStatement.java      |    9 +
 .../metadata/view/AlterLogicalViewStatement.java   |   45 +
 .../metadata/view/CreateLogicalViewStatement.java  |   39 +
 .../metadata/view/DeleteLogicalViewStatement.java  |   17 +
 .../metadata/view/RenameLogicalViewStatement.java  |   19 +
 .../plan/statement/sys/AuthorStatement.java        |  136 ++-
 .../plan/statement/sys/KillQueryStatement.java     |   14 +
 .../plan/statement/sys/ShowQueriesStatement.java   |   14 +
 .../plan/statement/sys/ShowVersionStatement.java   |    9 +
 .../MultiInputColumnIntermediateLayer.java         |   10 +-
 .../schemaengine/metric/SchemaEngineMemMetric.java |   24 -
 .../schemaengine/metric/SchemaMetricManager.java   |    3 +-
 .../metric/SchemaRegionCachedMetric.java           |    4 +-
 .../schemaengine/metric/SchemaRegionMemMetric.java |   36 +-
 .../schemaengine/schemaregion/ISchemaRegion.java   |    2 +-
 .../schemaregion/impl/SchemaRegionMemoryImpl.java  |    4 +-
 .../schemaregion/impl/SchemaRegionPBTreeImpl.java  |    4 +-
 .../mtree/impl/mem/MTreeBelowSGMemoryImpl.java     |   54 +-
 .../impl/mem/snapshot/MemMTreeSnapshotUtil.java    |    9 +-
 .../mtree/impl/pbtree/MTreeBelowSGCachedImpl.java  |   56 +-
 .../schemaregion/mtree/traverser/Traverser.java    |   11 +-
 .../mtree/traverser/basic/DatabaseTraverser.java   |   10 +-
 .../mtree/traverser/basic/EntityTraverser.java     |   11 +-
 .../mtree/traverser/basic/MNodeTraverser.java      |   11 +-
 .../traverser/basic/MeasurementTraverser.java      |   12 +-
 .../traverser/collector/DatabaseCollector.java     |    9 +-
 .../mtree/traverser/collector/EntityCollector.java |    9 +-
 .../traverser/collector/MNodeAboveDBCollector.java |    9 +-
 .../mtree/traverser/collector/MNodeCollector.java  |    9 +-
 .../traverser/collector/MeasurementCollector.java  |    9 +-
 .../mtree/traverser/counter/DatabaseCounter.java   |   10 +-
 .../mtree/traverser/counter/EntityCounter.java     |   10 +-
 .../traverser/counter/MeasurementCounter.java      |    9 +-
 .../mtree/traverser/updater/EntityUpdater.java     |   11 +-
 .../traverser/updater/MeasurementUpdater.java      |   10 +-
 .../schemaregion/read/req/IShowSchemaPlan.java     |    3 +
 .../read/req/SchemaRegionReadPlanFactory.java      |   69 +-
 .../read/req/impl/AbstractShowSchemaPlanImpl.java  |   13 +-
 .../read/req/impl/ShowDevicesPlanImpl.java         |    6 +-
 .../read/req/impl/ShowNodesPlanImpl.java           |    6 +-
 .../read/req/impl/ShowTimeSeriesPlanImpl.java      |    6 +-
 .../template/ClusterTemplateManager.java           |    8 +-
 .../db/schemaengine/template/ITemplateManager.java |    4 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |   22 +-
 .../apache/iotdb/db/service/MLNodeRPCService.java  |    8 +-
 .../iotdb/db/service/metrics/FileMetrics.java      |  542 +--------
 .../metrics/IoTDBInternalLocalReporter.java        |   11 +-
 .../metrics/file/CompactionFileMetrics.java        |  165 +++
 .../db/service/metrics/file/ModsFileMetrics.java   |   85 ++
 .../metrics/file/SystemRelatedFileMetrics.java     |  109 ++
 .../db/service/metrics/file/TsFileMetrics.java     |  414 +++++++
 .../db/service/metrics/file/WalFileMetrics.java    |   58 +
 .../iotdb/db/storageengine/StorageEngine.java      |   31 +-
 .../db/storageengine/dataregion/DataRegion.java    |  144 +--
 .../FileCannotTransitToCompactingException.java}   |   31 +-
 .../execute/task/AbstractCompactionTask.java       |   71 +-
 .../task/CompactionTaskType.java}                  |   16 +-
 .../execute/task/CrossSpaceCompactionTask.java     |   98 +-
 .../execute/task/InnerSpaceCompactionTask.java     |  112 +-
 .../compaction/execute/utils/CompactionUtils.java  |   86 +-
 .../execute/utils/log/CompactionLogger.java        |   34 +-
 .../compaction/io/CompactionTsFileWriter.java      |   34 +-
 .../compaction/schedule/CompactionScheduler.java   |   28 +-
 .../compaction/schedule/CompactionTaskManager.java |    4 -
 .../compaction/schedule/CompactionWorker.java      |   72 +-
 .../DefaultCompactionTaskComparatorImpl.java       |    7 +
 .../compaction/selector/ICompactionSelector.java   |    3 +-
 .../selector/IInnerSeqSpaceSelector.java           |    3 +-
 .../selector/IInnerUnseqSpaceSelector.java         |    3 +-
 .../impl/SizeTieredCompactionSelector.java         |  121 +-
 .../compaction/settle/SettleRequestHandler.java    |    1 -
 .../ITimeRange.java}                               |   14 +-
 .../Interval.java}                                 |   33 +-
 .../compaction/tool/ListTimeRangeImpl.java         |   89 ++
 .../compaction/tool/OverlapStatistic.java          |   91 ++
 .../compaction/tool/OverlapStatisticTool.java      |  248 ++++
 .../dataregion/compaction/tool/PrintUtil.java      |  209 ++++
 .../SequenceFileSubTaskThreadExecutor.java}        |   25 +-
 .../compaction/tool/SequenceFileTaskSummary.java   |   58 +
 .../compaction/tool/SingleSequenceFileTask.java    |  104 ++
 .../compaction/tool/TimePartitionProcessTask.java  |  143 +++
 .../tool/TimePartitionProcessWorker.java           |   58 +
 .../compaction/tool/TsFileStatisticReader.java     |   96 ++
 .../compaction/tool/UnseqSpaceStatistics.java      |   84 ++
 .../dataregion/flush/CompressionRatio.java         |  132 +-
 .../dataregion/memtable/AbstractMemTable.java      |   60 +-
 .../memtable/AlignedWritableMemChunk.java          |   19 +-
 .../dataregion/memtable/IMemTable.java             |    6 +
 .../dataregion/memtable/PrimitiveMemTable.java     |   18 +-
 .../dataregion/memtable/TsFileProcessor.java       |   16 +-
 .../dataregion/memtable/WritableMemChunk.java      |   18 +-
 .../io/LocalTextModificationAccessor.java          |   36 +-
 .../modification/io/ModificationWriter.java        |    3 +
 .../dataregion/snapshot/SnapshotLogger.java        |    6 +-
 .../dataregion/tsfile/TsFileManager.java           |   30 +-
 .../dataregion/tsfile/TsFileResource.java          |   94 +-
 .../tsfile/timeindex/DeviceTimeIndex.java          |    9 +-
 .../dataregion/tsfile/timeindex/FileTimeIndex.java |    6 +-
 .../storageengine/dataregion/wal/WALManager.java   |   27 +-
 .../dataregion/wal/buffer/WALBuffer.java           |   30 +-
 .../storageengine/dataregion/wal/node/WALNode.java |   22 +-
 .../dataregion/wal/recover/WALRecoverManager.java  |   15 +-
 .../wal/recover/file/TsFilePlanRedoer.java         |    4 +-
 .../file/UnsealedTsFileRecoverPerformer.java       |   10 +-
 .../dataregion/wal/utils/WALInsertNodeCache.java   |   19 +-
 .../storageengine/rescon/disk/FolderManager.java   |    2 +
 .../rescon/memory/MemTableManager.java             |    8 +-
 .../db/storageengine/rescon/memory/SystemInfo.java |    9 +-
 .../iotdb/db/tools/TsFileSplitByPartitionTool.java |   10 +-
 .../TsFileOverlapValidationAndRepairTool.java      |  276 +++++
 .../org/apache/iotdb/db/utils/CommonUtils.java     |    7 +
 .../apache/iotdb/db/utils/TimePartitionUtils.java  |   49 -
 .../iotdb/db/utils/TimestampPrecisionUtils.java    |   44 +-
 .../iotdb/db/utils/constant/SqlConstant.java       |    3 +
 .../iotdb/db/auth/AuthorizerManagerTest.java       |  187 ++-
 .../auth/authorizer/LocalFileAuthorizerTest.java   |   78 +-
 .../iotdb/db/auth/entity/PathPrivilegeTest.java    |   52 +-
 .../org/apache/iotdb/db/auth/entity/RoleTest.java  |   23 +-
 .../org/apache/iotdb/db/auth/entity/UserTest.java  |   11 +-
 .../db/auth/role/LocalFileRoleAccessorTest.java    |   70 +-
 .../db/auth/role/LocalFileRoleManagerTest.java     |   99 +-
 .../db/auth/user/LocalFileUserAccessorTest.java    |   71 +-
 .../db/auth/user/LocalFileUserManagerTest.java     |  155 +--
 .../cache/dualkeycache/DualKeyCacheTest.java       |  185 ++-
 .../schemaRegion/SchemaRegionAliasAndTagTest.java  |   21 +-
 .../schemaRegion/SchemaRegionBasicTest.java        |  172 +--
 .../schemaRegion/SchemaRegionManagementTest.java   |   22 +-
 .../schemaRegion/SchemaRegionTemplateTest.java     |    6 +-
 .../schemaRegion/SchemaRegionTestUtil.java         |   90 +-
 .../extractor/CachedSchemaPatternMatcherTest.java  |    2 +-
 .../db/protocol/mqtt/BrokerAuthenticatorTest.java  |   15 +-
 .../db/queryengine/execution/DataDriverTest.java   |    1 +
 .../execution/load/LoadTsFileManagerTest.java      |   60 +-
 .../execution/load/MergedTsFileSplitterTest.java   |    2 +-
 .../db/queryengine/execution/load/TestBase.java    |   83 +-
 .../execution/load/TsFileSplitSenderTest.java      |   72 +-
 .../operator/AlignedSeriesScanOperatorTest.java    |    2 +
 .../operator/HorizontallyConcatOperatorTest.java   |    3 +
 .../execution/operator/LimitOperatorTest.java      |    2 +
 .../execution/operator/MergeSortOperatorTest.java  |  173 ++-
 .../execution/operator/OffsetOperatorTest.java     |    4 +
 .../execution/operator/OperatorMemoryTest.java     |    5 +-
 .../operator/SingleDeviceViewOperatorTest.java     |    2 +
 .../execution/operator/SortOperatorTest.java       |    3 +
 .../schema/SchemaQueryScanOperatorTest.java        |   12 +-
 .../queryengine/plan/analyze/AnalyzeFailTest.java  |   38 +-
 .../db/queryengine/plan/analyze/AnalyzeTest.java   |   25 +-
 .../plan/analyze/FakePartitionFetcherImpl.java     |    6 +-
 .../plan/analyze/cache/PartitionCacheTest.java     |   33 +-
 .../plan/parser/StatementGeneratorTest.java        |  353 +++++-
 .../plan/plan/QueryLogicalPlanUtil.java            |    2 +-
 .../distribution/DistributionPlannerBasicTest.java |   10 +-
 .../plan/plan/distribution/LastQueryTest.java      |    2 +-
 .../queryengine/plan/plan/distribution/Util.java   |    7 +-
 .../read/DeviceSchemaScanNodeSerdeTest.java        |    4 +-
 .../NodeManagementMemoryMergeNodeSerdeTest.java    |    6 +-
 .../read/PathsUsingTemplateScanNodeTest.java       |    6 +-
 .../metadata/read/SchemaCountNodeSerdeTest.java    |   12 +-
 .../read/TimeSeriesSchemaScanNodeSerdeTest.java    |    4 +-
 .../plan/node/write/WritePlanNodeSplitTest.java    |   30 +-
 .../security/encrypt/MessageDigestEncryptTest.java |    2 +-
 .../iotdb/db/storageengine/StorageEngineTest.java  |   14 +
 .../storageengine/dataregion/DataRegionTest.java   |   58 +-
 .../compaction/CompactionTaskComparatorTest.java   |  121 +-
 .../compaction/CompactionTaskManagerTest.java      |   26 +-
 ...yControlTest.java => CompactionWorkerTest.java} |   78 +-
 .../CrossSpaceCompactionWithUnusualCasesTest.java  |    9 -
 .../compaction/FastAlignedCrossCompactionTest.java |   47 -
 .../FastCompactionPerformerWithEmptyPageTest.java  |    2 -
 ...InconsistentCompressionTypeAndEncodingTest.java |    7 -
 .../FastCrossCompactionPerformerTest.java          |   17 +-
 .../FastInnerCompactionPerformerTest.java          |  121 +-
 .../FastNonAlignedCrossCompactionTest.java         |   47 -
 .../compaction/ReadChunkInnerCompactionTest.java   |   19 +-
 .../ReadPointAlignedCrossCompactionTest.java       |   47 -
 .../ReadPointNonAlignedCrossCompactionTest.java    |   47 -
 .../cross/CrossSpaceCompactionSelectorTest.java    |   92 +-
 .../CrossSpaceCompactionWithFastPerformerTest.java |    4 -
 ...eCompactionWithFastPerformerValidationTest.java |  173 +--
 ...sSpaceCompactionWithReadPointPerformerTest.java |    4 -
 ...actionWithReadPointPerformerValidationTest.java |  160 +--
 ...eCrossSpaceCompactionWithFastPerformerTest.java |   31 +-
 ...sSpaceCompactionWithReadPointPerformerTest.java |   31 +-
 .../inner/InnerCompactionEmptyTsFileTest.java      |    2 -
 .../InnerSeqCompactionWithFastPerformerTest.java   |   20 +-
 ...nerSeqCompactionWithReadChunkPerformerTest.java |   13 +-
 .../inner/InnerSpaceCompactionSelectorTest.java    |  112 +-
 .../SizeTieredCompactionRecoverTest.java           |    2 -
 ...eCrossSpaceCompactionRecoverCompatibleTest.java |    4 +-
 .../SizeTieredCompactionRecoverCompatibleTest.java |    4 +-
 .../recover/SizeTieredCompactionRecoverTest.java   |    3 +-
 .../settle/SettleRequestHandlerTest.java           |    2 +-
 .../compaction/tools/ListTimeRangeImplTest.java    |  138 +++
 .../compaction/tools/UnseqSpaceStatisticsTest.java |   63 +
 .../utils/CompactionFileGeneratorUtils.java        |   23 +-
 .../utils/CompactionUpdateFileCountTest.java       |  109 ++
 .../utils/MultiTsFileDeviceIteratorTest.java       |    7 -
 .../dataregion/flush/CompressionRatioTest.java     |  108 +-
 .../dataregion/memtable/MemTableFlushTaskTest.java |    4 +-
 .../dataregion/memtable/MemtableBenchmark.java     |    4 +-
 .../dataregion/memtable/PrimitiveMemTableTest.java |   16 +-
 .../wal/checkpoint/CheckpointManagerTest.java      |    8 +-
 .../dataregion/wal/io/CheckpointFileTest.java      |    8 +-
 .../dataregion/wal/node/WALEntryHandlerTest.java   |   20 +-
 .../dataregion/wal/node/WALNodeTest.java           |   10 +-
 .../wal/recover/WALRecoverManagerTest.java         |   13 +-
 .../wal/utils/WALInsertNodeCacheTest.java          |  161 ++-
 .../TsFileOverlapValidationAndRepairToolTest.java  |  193 +++
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   18 +-
 .../db/utils/TimestampPrecisionUtilsTest.java      |   87 ++
 iotdb-core/metrics/interface/pom.xml               |    1 -
 .../metrics/metricsets/system/SystemMetrics.java   |   31 +-
 .../apache/iotdb/metrics/utils/SystemMetric.java   |    1 +
 iotdb-core/mlnode/.gitignore                       |    3 +-
 iotdb-core/mlnode/README.md                        |    4 +-
 .../mlnode/iotdb/mlnode/algorithm/factory.py       |   66 +
 .../iotdb/mlnode/algorithm/hyperparameter.py       |  407 +++++++
 iotdb-core/mlnode/iotdb/mlnode/algorithm/metric.py |   80 ++
 .../mlnode/algorithm/models/forecast/__init__.py}  |   10 -
 .../mlnode/algorithm/models/forecast/dlinear.py    |  133 ++
 .../mlnode/algorithm/models/forecast/nbeats.py     |  131 ++
 .../mlnode/iotdb/mlnode/algorithm/validator.py     |   45 +
 iotdb-core/mlnode/iotdb/mlnode/client.py           |  111 +-
 iotdb-core/mlnode/iotdb/mlnode/config.py           |   36 +
 iotdb-core/mlnode/iotdb/mlnode/constant.py         |   68 +-
 iotdb-core/mlnode/iotdb/mlnode/dataset/dataset.py  |   74 ++
 iotdb-core/mlnode/iotdb/mlnode/dataset/factory.py  |   50 +
 iotdb-core/mlnode/iotdb/mlnode/dataset/source.py   |   80 ++
 .../mlnode/dataset/utils/__init__.py}              |   10 -
 .../iotdb/mlnode/dataset/utils/time_features.py    |  149 +++
 iotdb-core/mlnode/iotdb/mlnode/exception.py        |   39 +-
 iotdb-core/mlnode/iotdb/mlnode/handler.py          |   63 +-
 iotdb-core/mlnode/iotdb/mlnode/parser.py           |   99 ++
 .../mlnode/process/__init__.py}                    |   10 -
 iotdb-core/mlnode/iotdb/mlnode/process/manager.py  |  124 ++
 iotdb-core/mlnode/iotdb/mlnode/process/task.py     |  265 ++++
 iotdb-core/mlnode/iotdb/mlnode/process/trial.py    |  224 ++++
 iotdb-core/mlnode/iotdb/mlnode/serde.py            |  147 ++-
 iotdb-core/mlnode/iotdb/mlnode/service.py          |    4 -
 iotdb-core/mlnode/iotdb/mlnode/storage.py          |   28 +-
 iotdb-core/mlnode/iotdb/mlnode/util.py             |   25 +-
 iotdb-core/mlnode/pom.xml                          |    2 -
 iotdb-core/mlnode/requirements.txt                 |    7 +-
 iotdb-core/mlnode/resources/conf/iotdb-mlnode.toml |   14 +-
 .../mlnode/test/test_create_forecast_model.py      |  100 ++
 iotdb-core/mlnode/test/test_model_storage.py       |   17 +-
 iotdb-core/mlnode/test/test_serde.py               |    8 +-
 iotdb-core/node-commons/pom.xml                    |    5 +-
 .../resources/conf/iotdb-common.properties         |   20 +-
 .../commons/auth/authorizer/BasicAuthorizer.java   |  153 +--
 .../iotdb/commons/auth/authorizer/IAuthorizer.java |   28 +-
 .../commons/auth/authorizer/OpenIdAuthorizer.java  |    2 +-
 .../iotdb/commons/auth/entity/PathPrivilege.java   |  152 ++-
 .../commons/auth/entity/PriPrivilegeType.java      |   91 ++
 .../iotdb/commons/auth/entity/PrivilegeType.java   |   92 +-
 .../org/apache/iotdb/commons/auth/entity/Role.java |  230 +++-
 .../org/apache/iotdb/commons/auth/entity/User.java |  171 ++-
 .../iotdb/commons/auth/role/BasicRoleManager.java  |  174 ++-
 .../iotdb/commons/auth/role/IRoleAccessor.java     |    9 +-
 .../iotdb/commons/auth/role/IRoleManager.java      |   19 +-
 .../commons/auth/role/LocalFileRoleAccessor.java   |  147 ++-
 .../commons/auth/role/LocalFileRoleManager.java    |    7 +
 .../iotdb/commons/auth/user/BasicUserManager.java  |  232 ++--
 .../iotdb/commons/auth/user/IUserAccessor.java     |    3 +
 .../iotdb/commons/auth/user/IUserManager.java      |   13 +-
 .../commons/auth/user/LocalFileUserAccessor.java   |  290 ++++-
 .../commons/auth/user/LocalFileUserManager.java    |    6 +
 .../iotdb/commons/client/ClientPoolFactory.java    |   26 +
 .../iotdb/commons/client/mlnode}/MLNodeClient.java |  144 ++-
 .../mlnode/MLNodeClientManager.java}               |   28 +-
 .../iotdb/commons/client/mlnode/MLNodeInfo.java}   |   11 +-
 .../client/property/ThriftClientProperty.java      |    3 +-
 .../concurrent/ExceptionalCountDownLatch.java      |   54 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |   69 +-
 .../iotdb/commons/conf/CommonDescriptor.java       |   31 +-
 .../commons/consensus/index/ProgressIndex.java     |    4 +-
 .../consensus/index/impl/HybridProgressIndex.java  |    2 +-
 .../consensus/index/impl/MinimumProgressIndex.java |    8 +-
 .../exception/IllegalPrivilegeException.java       |   30 +-
 .../PipeRuntimeConnectorCriticalException.java     |    6 +-
 .../pipe/PipeRuntimeCriticalException.java         |    5 -
 .../pipe/PipeRuntimeNonCriticalException.java      |    5 -
 .../commons/model/ForecastModeInformation.java     |  153 +++
 .../iotdb/commons/model/ModelInformation.java      |  235 ++--
 ...TrailInformation.java => TrialInformation.java} |   34 +-
 .../iotdb/commons/partition/DataPartition.java     |   17 +-
 .../apache/iotdb/commons/path/PathPatternTree.java |   19 +-
 .../apache/iotdb/commons/path/fa/FAFactory.java    |    2 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      |   21 +-
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |    2 +
 .../plugin/builtin/connector/OpcUaConnector.java}  |   23 +-
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java |   26 +-
 .../iotdb/commons/schema/SchemaConstant.java       |   18 +-
 .../service/metric/JvmGcMonitorMetrics.java        |   35 +-
 .../iotdb/commons/service/metric/enums/Metric.java |    3 +
 .../udf/builtin/ModelInferenceFunction.java}       |   36 +-
 .../commons/udf/service/UDFManagementService.java  |   19 +
 .../org/apache/iotdb/commons/utils/AuthUtils.java  |  258 ++--
 .../org/apache/iotdb/commons/utils/IOUtils.java    |  115 +-
 .../iotdb/commons/utils/TimePartitionUtils.java    |   83 ++
 .../iotdb/commons/path/PathPatternTreeTest.java    |    9 +-
 .../iotdb/commons/pipe/PipeMetaDeSerTest.java      |    2 +-
 .../apache/iotdb/commons/utils/AuthUtilsTest.java  |  203 ++++
 iotdb-core/tsfile/pom.xml                          |    4 +-
 .../tsfile/file/metadata/enums/TSDataType.java     |   10 +
 .../iotdb/tsfile/read/common/block/TsBlock.java    |   20 +-
 .../tsfile/read/common/block/TsBlockBuilder.java   |    4 +-
 .../tsfile/write/writer/LocalTsFileOutput.java     |    6 +
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |    7 +
 .../iotdb/tsfile/write/writer/TsFileOutput.java    |    3 +
 .../tsfile/write/writer/TestTsFileOutput.java      |    5 +
 iotdb-protocol/openapi/pom.xml                     |    2 -
 iotdb-protocol/thrift-commons/pom.xml              |    1 -
 .../thrift-commons/src/main/thrift/common.thrift   |    5 +-
 iotdb-protocol/thrift-confignode/pom.xml           |    1 -
 .../src/main/thrift/confignode.thrift              |  123 +-
 iotdb-protocol/thrift-consensus/pom.xml            |    1 -
 iotdb-protocol/thrift-datanode/pom.xml             |    3 +-
 .../src/main/thrift/datanode.thrift                |    7 +-
 iotdb-protocol/thrift-mlnode/pom.xml               |    1 -
 .../thrift-mlnode/src/main/thrift/mlnode.thrift    |   12 +-
 library-udf/pom.xml                                |    3 -
 .../library/series/UDTFConsecutiveSequences.java   |   23 +-
 .../library/series/UDTFConsecutiveWindows.java     |   23 +-
 .../iotdb/library/series/util/ConsecutiveUtil.java |   23 +-
 pom.xml                                            |  235 +++-
 835 files changed, 34774 insertions(+), 10077 deletions(-)

diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DataPartitionBatchFetcher.java
index f969a2746a6,00000000000..eaa9f427ecb
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DataPartitionBatchFetcher.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DataPartitionBatchFetcher.java
@@@ -1,70 -1,0 +1,71 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.load;
 +
 +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 +import org.apache.iotdb.commons.partition.DataPartition;
 +import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 +import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 +import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
 +import org.apache.iotdb.tsfile.utils.Pair;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.stream.Collectors;
 +
 +public class DataPartitionBatchFetcher {
 +
 +  private final IPartitionFetcher fetcher;
 +
 +  public DataPartitionBatchFetcher(IPartitionFetcher fetcher) {
 +    this.fetcher = fetcher;
 +  }
 +
 +  public List<TRegionReplicaSet> queryDataPartition(
-       List<Pair<String, TTimePartitionSlot>> slotList) {
++      List<Pair<String, TTimePartitionSlot>> slotList, String userName) {
 +    List<TRegionReplicaSet> replicaSets = new ArrayList<>();
 +    int size = slotList.size();
 +
 +    for (int i = 0; i < size; i += LoadTsFileScheduler.TRANSMIT_LIMIT) {
 +      List<Pair<String, TTimePartitionSlot>> subSlotList =
 +          slotList.subList(i, Math.min(size, i + LoadTsFileScheduler.TRANSMIT_LIMIT));
-       DataPartition dataPartition = fetcher.getOrCreateDataPartition(toQueryParam(subSlotList));
++      DataPartition dataPartition =
++          fetcher.getOrCreateDataPartition(toQueryParam(subSlotList), userName);
 +      replicaSets.addAll(
 +          subSlotList.stream()
 +              .map(pair -> dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right))
 +              .collect(Collectors.toList()));
 +    }
 +    return replicaSets;
 +  }
 +
 +  private List<DataPartitionQueryParam> toQueryParam(List<Pair<String, TTimePartitionSlot>> slots) {
 +    return slots.stream()
 +        .collect(
 +            Collectors.groupingBy(
 +                Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet())))
 +        .entrySet()
 +        .stream()
 +        .map(
 +            entry -> new DataPartitionQueryParam(entry.getKey(), new ArrayList<>(entry.getValue())))
 +        .collect(Collectors.toList());
 +  }
 +}
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
index ed3f92ff925,00000000000..36448bd7aa1
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
@@@ -1,58 -1,0 +1,59 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.load;
 +
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 +
 +import java.io.File;
 +
 +/** Like TsFileDataManager, but one batch (TsFilePieceNode) belongs to the same device. */
 +public class DeviceBatchTsFileDataManager extends TsFileDataManager {
 +
 +  private String currentDeviceId;
 +
 +  public DeviceBatchTsFileDataManager(
 +      DispatchFunction dispatchFunction,
 +      PlanNodeId planNodeId,
 +      File targetFile,
 +      DataPartitionBatchFetcher partitionBatchFetcher,
-       long maxMemorySize) {
-     super(dispatchFunction, planNodeId, targetFile, partitionBatchFetcher, maxMemorySize);
++      long maxMemorySize,
++      String userName) {
++    super(dispatchFunction, planNodeId, targetFile, partitionBatchFetcher, maxMemorySize, userName);
 +  }
 +
 +  protected boolean addOrSendChunkData(ChunkData chunkData) {
 +    if (currentDeviceId != null && !currentDeviceId.equals(chunkData.getDevice())) {
 +      // a new device, flush previous data first
 +      boolean flushSucceed = flushChunkData(true);
 +      if (!flushSucceed) {
 +        return false;
 +      }
 +    }
 +    // add the chunk into the batch
 +    currentDeviceId = chunkData.getDevice();
 +    nonDirectionalChunkData.add(chunkData);
 +    dataSize += chunkData.getDataSize();
 +    if (dataSize > maxMemorySize) {
 +      return flushChunkData(false);
 +    }
 +
 +    return true;
 +  }
 +}
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 6b055c55db5,97542ceab24..5baf1de9314
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@@ -47,8 -45,8 +45,10 @@@ import java.nio.file.DirectoryNotEmptyE
  import java.nio.file.Files;
  import java.nio.file.Path;
  import java.util.HashMap;
++import java.util.HashSet;
  import java.util.Map;
  import java.util.Objects;
++import java.util.Set;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.ScheduledFuture;
@@@ -218,13 -214,8 +218,14 @@@ public class LoadTsFileManager 
        }
      }
  
 +    // method is synchronized because the chunks in a chunk group may be sent in parallel
      @SuppressWarnings("squid:S3824")
-     private synchronized void write(DataPartitionInfo partitionInfo, ChunkData chunkData) throws IOException {
 -    private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) throws IOException {
++    private synchronized void write(DataPartitionInfo partitionInfo, ChunkData chunkData)
++        throws IOException {
 +      // ensure that retransmission will not result in writing duplicated data
 +      if (receivedSplitIds.contains(chunkData.getSplitId())) {
 +        return;
 +      }
        if (isClosed) {
          throw new IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
        }
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
index 2f9464c0830,00000000000..08cfc649d3b
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
@@@ -1,766 -1,0 +1,765 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.load;
 +
 +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
++import org.apache.iotdb.commons.utils.TimePartitionUtils;
 +import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 +import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
- import org.apache.iotdb.db.utils.TimePartitionUtils;
 +import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 +import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 +import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
 +import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
 +import org.apache.iotdb.tsfile.file.MetaMarker;
 +import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
 +import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 +import org.apache.iotdb.tsfile.file.header.PageHeader;
 +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 +import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 +import org.apache.iotdb.tsfile.read.common.BatchData;
 +import org.apache.iotdb.tsfile.read.reader.page.PageReader;
 +import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
 +import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
 +import org.apache.iotdb.tsfile.utils.Pair;
 +import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.DataOutputStream;
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.NoSuchElementException;
 +import java.util.PriorityQueue;
 +import java.util.Set;
 +import java.util.TreeMap;
 +import java.util.concurrent.BlockingQueue;
 +import java.util.concurrent.CancellationException;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.LinkedBlockingDeque;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.function.Function;
 +
 +public class MergedTsFileSplitter {
 +
 +  private static final Logger logger = LoggerFactory.getLogger(TsFileSplitter.class);
 +
 +  private final List<File> tsFiles;
 +  private final Function<TsFileData, Boolean> consumer;
 +  private final PriorityQueue<SplitTask> taskPriorityQueue;
 +  private final int maxConcurrentFileNum;
 +  private ExecutorService asyncExecutor;
 +  private long timePartitionInterval;
 +  private AtomicInteger splitIdGenerator = new AtomicInteger();
 +  private Statistic statistic = new Statistic();
 +
 +  public MergedTsFileSplitter(
 +      List<File> tsFiles,
 +      Function<TsFileData, Boolean> consumer,
 +      ExecutorService asyncExecutor,
 +      long timePartitionInterval,
 +      int maxConcurrentFileNum) {
 +    this.tsFiles = tsFiles;
 +    this.consumer = consumer;
 +    this.asyncExecutor = asyncExecutor;
 +    this.timePartitionInterval = timePartitionInterval;
 +    this.maxConcurrentFileNum = maxConcurrentFileNum;
 +    taskPriorityQueue = new PriorityQueue<>();
 +  }
 +
 +  public void splitTsFileByDataPartition() throws IOException, IllegalStateException {
 +    long startTime = System.nanoTime();
 +    int i = 0;
 +    for (; i < tsFiles.size(); i++) {
 +      // only allow at most maxConcurrentFileNum files to be merged at the same time
 +      if (taskPriorityQueue.size() > maxConcurrentFileNum) {
 +        break;
 +      }
 +
 +      File tsFile = tsFiles.get(i);
 +      SplitTask splitTask = new SplitTask(tsFile, asyncExecutor, i);
 +      logger.info("Start to split {}", tsFiles.get(i));
 +      if (splitTask.hasNext()) {
 +        taskPriorityQueue.add(splitTask);
 +      }
 +    }
 +    statistic.initTime = System.nanoTime() - startTime;
 +
 +    while (!taskPriorityQueue.isEmpty()) {
 +      startTime = System.nanoTime();
 +      SplitTask task = taskPriorityQueue.poll();
 +      TsFileData tsFileData = task.removeNext();
 +      statistic.fetchDataTime += System.nanoTime() - startTime;
 +      tsFileData.setSplitId(splitIdGenerator.incrementAndGet());
 +
 +      startTime = System.nanoTime();
 +      consumer.apply(tsFileData);
 +      statistic.consumeTime += System.nanoTime() - startTime;
 +
- 
 +      startTime = System.nanoTime();
 +      if (task.hasNext()) {
 +        taskPriorityQueue.add(task);
 +      } else {
 +        // when a file is exhausted, add the next non-empty file
 +        for (; i < tsFiles.size(); i++) {
 +          SplitTask splitTask = new SplitTask(tsFiles.get(i), asyncExecutor, i);
 +          logger.info("Start to split {}", tsFiles.get(i));
 +          if (splitTask.hasNext()) {
 +            taskPriorityQueue.add(splitTask);
 +            i++;
 +            break;
 +          }
 +        }
 +      }
 +      statistic.enqueueTime += System.nanoTime() - startTime;
 +    }
 +  }
 +
 +  public void close() throws IOException {
-     logger.info("Init/FetchData/Consume/Enqueue Time: {}/{}/{}/{}ms"
-         , statistic.initTime / 1_000_000L
-         , statistic.fetchDataTime / 1_000_000L
-         , statistic.consumeTime / 1_000_000L
-         , statistic.enqueueTime / 1_000_000L);
++    logger.info(
++        "Init/FetchData/Consume/Enqueue Time: {}/{}/{}/{}ms",
++        statistic.initTime / 1_000_000L,
++        statistic.fetchDataTime / 1_000_000L,
++        statistic.consumeTime / 1_000_000L,
++        statistic.enqueueTime / 1_000_000L);
 +    for (SplitTask task : taskPriorityQueue) {
 +      task.close();
 +    }
 +    taskPriorityQueue.clear();
 +    logger.info("Splitter closed.");
 +  }
 +
 +  private class Statistic {
 +    private long initTime;
 +    private long fetchDataTime;
 +    private long consumeTime;
 +    private long enqueueTime;
 +  }
 +
 +  private class SplitTask implements Comparable<SplitTask> {
 +
 +    private final File tsFile;
 +    private final int fileId;
 +    private TsFileSequenceReader reader;
 +    private final TreeMap<Long, List<Deletion>> offset2Deletions;
 +
 +    private String curDevice;
 +    private boolean isTimeChunkNeedDecode;
 +    private Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData;
 +    private Map<Integer, long[]> pageIndex2Times;
 +    private Map<Long, IChunkMetadata> offset2ChunkMetadata;
 +
 +    private BlockingQueue<TsFileData> nextSplits;
 +    private TsFileData nextSplit;
 +    private byte marker = -1;
 +
 +    private ExecutorService asyncExecutor;
 +    private Future<Void> asyncTask;
 +
 +    public SplitTask(File tsFile, ExecutorService asyncExecutor, int fileId) throws IOException {
 +      this.tsFile = tsFile;
 +      this.fileId = fileId;
 +      this.asyncExecutor = asyncExecutor;
 +      offset2Deletions = new TreeMap<>();
 +      init();
 +    }
 +
 +    @Override
 +    public int compareTo(SplitTask o) {
 +      try {
 +        TsFileData thisNext = showNext();
 +        TsFileData thatNext = o.showNext();
 +        // out put modification first
 +        if (thisNext.isModification() && thatNext.isModification()) {
 +          return 0;
 +        }
 +        if (thisNext.isModification()) {
 +          return 1;
 +        }
 +        if (thatNext.isModification()) {
 +          return -1;
 +        }
 +
 +        ChunkData thisChunk = (ChunkData) thisNext;
 +        ChunkData thatChunk = ((ChunkData) thatNext);
 +        Comparator<ChunkData> chunkDataComparator =
 +            Comparator.comparing(ChunkData::getDevice, String::compareTo)
 +                .thenComparing(ChunkData::firstMeasurement, String::compareTo);
 +        int chunkCompare = chunkDataComparator.compare(thisChunk, thatChunk);
 +        if (chunkCompare != 0) {
 +          return chunkCompare;
 +        }
 +        return Integer.compare(this.fileId, o.fileId);
 +      } catch (IOException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    private void init() throws IOException {
 +      this.reader = new TsFileSequenceReader(tsFile.getAbsolutePath());
 +      getAllModification(offset2Deletions);
 +
 +      if (!checkMagic(reader)) {
 +        throw new TsFileRuntimeException(
 +            String.format("Magic String check error when parsing TsFile %s.", tsFile.getPath()));
 +      }
 +      reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
 +
 +      curDevice = null;
 +      isTimeChunkNeedDecode = true;
 +      pageIndex2ChunkData = new HashMap<>();
 +      pageIndex2Times = null;
 +      offset2ChunkMetadata = new HashMap<>();
 +      getChunkMetadata(reader, offset2ChunkMetadata);
 +
 +      nextSplits = new LinkedBlockingDeque<>(64);
 +      if (asyncExecutor != null) {
 +        asyncTask =
 +            asyncExecutor.submit(
 +                () -> {
 +                  try {
 +                    asyncComputeNext();
 +                  } catch (Throwable e) {
 +                    logger.info("Exception during splitting", e);
 +                    throw e;
 +                  }
 +                  return null;
 +                });
 +      }
 +    }
 +
 +    private void asyncComputeNext() throws IOException {
 +      while (reader != null && !Thread.interrupted()) {
 +        computeNext();
 +      }
 +      logger.info("{} ends splitting", tsFile);
 +    }
 +
 +    public boolean hasNext() throws IOException {
 +      if (nextSplit != null && !(nextSplit instanceof EmptyTsFileData)) {
 +        return true;
 +      }
 +      if (reader == null && nextSplits.isEmpty()) {
 +        return false;
 +      }
 +
 +      if (asyncExecutor == null) {
 +        computeNext();
 +        if (!nextSplits.isEmpty()) {
 +          nextSplit = nextSplits.poll();
 +          return true;
 +        } else {
 +          return false;
 +        }
 +      } else {
 +        try {
 +          nextSplit = nextSplits.take();
 +        } catch (InterruptedException e) {
 +          return false;
 +        }
 +        return !(nextSplit instanceof EmptyTsFileData);
 +      }
 +    }
 +
 +    public TsFileData removeNext() throws IOException {
 +      if (!hasNext()) {
 +        throw new NoSuchElementException();
 +      }
 +      TsFileData split = nextSplit;
 +      nextSplit = null;
 +      return split;
 +    }
 +
 +    public TsFileData showNext() throws IOException {
 +      if (!hasNext()) {
 +        throw new NoSuchElementException();
 +      }
 +      return nextSplit;
 +    }
 +
 +    public void close() throws IOException {
 +      if (asyncTask != null) {
 +        asyncTask.cancel(true);
 +        try {
 +          asyncTask.get();
 +        } catch (CancellationException ignored) {
 +
 +        } catch (InterruptedException | ExecutionException e) {
 +          throw new IOException(e);
 +        }
 +      }
 +      if (reader != null) {
 +        reader.close();
 +        reader = null;
 +      }
 +    }
 +
 +    private byte nextMarker() throws IOException {
 +      if (marker != -1) {
 +        // inherit the marker from the previous breakpoint
 +        // e.g. the marker after processing a chunk
 +        return marker;
 +      }
 +      return marker = reader.readMarker();
 +    }
 +
 +    private void insertNewChunk(ChunkData chunkData) throws IOException {
 +      if (asyncExecutor == null) {
 +        nextSplits.add(chunkData);
 +      } else {
 +        try {
 +          nextSplits.put(chunkData);
 +        } catch (InterruptedException e) {
 +          throw new IOException(e);
 +        }
 +      }
 +    }
 +
 +    @SuppressWarnings({"squid:S3776", "squid:S6541"})
 +    private void computeNext() throws IOException, IllegalStateException {
 +      if (reader == null) {
 +        return;
 +      }
 +
 +      while (nextMarker() != MetaMarker.SEPARATOR) {
 +        switch (marker) {
 +          case MetaMarker.CHUNK_HEADER:
 +          case MetaMarker.TIME_CHUNK_HEADER:
 +          case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
 +          case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
 +            long chunkOffset = reader.position();
 +            boolean chunkDataGenerated =
 +                consumeAllAlignedChunkData(chunkOffset, pageIndex2ChunkData);
 +            handleModification(offset2Deletions, chunkOffset);
 +            if (chunkDataGenerated) {
 +              return;
 +            }
 +
 +            ChunkHeader header = reader.readChunkHeader(marker);
 +            if (header.getDataSize() == 0) {
 +              throw new TsFileRuntimeException(
 +                  String.format(
 +                      "Empty Nonaligned Chunk or Time Chunk with offset %d in TsFile %s.",
 +                      chunkOffset, tsFile.getPath()));
 +            }
 +
 +            boolean isAligned =
 +                ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
 +                    == TsFileConstant.TIME_COLUMN_MASK);
 +            IChunkMetadata chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
 +            TTimePartitionSlot timePartitionSlot =
-                 TimePartitionUtils.getTimePartition(
++                TimePartitionUtils.getTimePartitionSlot(
 +                    chunkMetadata.getStartTime(), timePartitionInterval);
 +            ChunkData chunkData =
 +                ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot);
 +
 +            if (!needDecodeChunk(chunkMetadata)) {
 +              chunkData.setNotDecode();
 +              chunkData.writeEntireChunk(reader.readChunk(-1, header.getDataSize()), chunkMetadata);
 +              if (isAligned) {
 +                isTimeChunkNeedDecode = false;
 +                pageIndex2ChunkData
 +                    .computeIfAbsent(1, o -> new ArrayList<>())
 +                    .add((AlignedChunkData) chunkData);
 +              } else {
 +                insertNewChunk(chunkData);
 +              }
 +              break;
 +            }
 +
 +            Decoder defaultTimeDecoder =
 +                Decoder.getDecoderByType(
 +                    TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
 +                    TSDataType.INT64);
 +            Decoder valueDecoder =
 +                Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
 +            int dataSize = header.getDataSize();
 +            int pageIndex = 0;
 +            if (isAligned) {
 +              isTimeChunkNeedDecode = true;
 +              pageIndex2Times = new HashMap<>();
 +            }
 +
 +            while (dataSize > 0) {
 +              PageHeader pageHeader =
 +                  reader.readPageHeader(
 +                      header.getDataType(),
 +                      (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
 +              long pageDataSize = pageHeader.getSerializedPageSize();
 +              if (!needDecodePage(pageHeader, chunkMetadata)) { // an entire page
 +                long startTime =
 +                    pageHeader.getStatistics() == null
 +                        ? chunkMetadata.getStartTime()
 +                        : pageHeader.getStartTime();
 +                TTimePartitionSlot pageTimePartitionSlot =
-                     TimePartitionUtils.getTimePartition(startTime, timePartitionInterval);
++                    TimePartitionUtils.getTimePartitionSlot(startTime, timePartitionInterval);
 +                if (!timePartitionSlot.equals(pageTimePartitionSlot)) {
 +                  if (!isAligned) {
 +                    insertNewChunk(chunkData);
 +                  }
 +                  timePartitionSlot = pageTimePartitionSlot;
 +                  chunkData =
 +                      ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot);
 +                }
 +                if (isAligned) {
 +                  pageIndex2ChunkData
 +                      .computeIfAbsent(pageIndex, o -> new ArrayList<>())
 +                      .add((AlignedChunkData) chunkData);
 +                }
 +                chunkData.writeEntirePage(pageHeader, reader.readCompressedPage(pageHeader));
 +              } else { // split page
 +                ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
 +                Pair<long[], Object[]> tvArray =
 +                    decodePage(
 +                        isAligned, pageData, pageHeader, defaultTimeDecoder, valueDecoder, header);
 +                long[] times = tvArray.left;
 +                Object[] values = tvArray.right;
 +                if (isAligned) {
 +                  pageIndex2Times.put(pageIndex, times);
 +                }
 +
 +                int start = 0;
 +                long endTime = timePartitionSlot.getStartTime() + timePartitionInterval;
 +                for (int i = 0; i < times.length; i++) {
 +                  if (times[i] >= endTime) {
 +                    chunkData.writeDecodePage(times, values, start, i);
 +                    if (isAligned) {
 +                      pageIndex2ChunkData
 +                          .computeIfAbsent(pageIndex, o -> new ArrayList<>())
 +                          .add((AlignedChunkData) chunkData);
 +                    } else {
 +                      insertNewChunk(chunkData);
 +                    }
 +
 +                    timePartitionSlot =
-                         TimePartitionUtils.getTimePartition(times[i], timePartitionInterval);
++                        TimePartitionUtils.getTimePartitionSlot(times[i], timePartitionInterval);
 +                    endTime = timePartitionSlot.getStartTime() + timePartitionInterval;
 +                    chunkData =
 +                        ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot);
 +                    start = i;
 +                  }
 +                }
 +                chunkData.writeDecodePage(times, values, start, times.length);
 +                if (isAligned) {
 +                  pageIndex2ChunkData
 +                      .computeIfAbsent(pageIndex, o -> new ArrayList<>())
 +                      .add((AlignedChunkData) chunkData);
 +                }
 +              }
 +
 +              pageIndex += 1;
 +              dataSize -= pageDataSize;
 +            }
 +
 +            if (!isAligned) {
 +              insertNewChunk(chunkData);
 +            }
 +            break;
 +          case MetaMarker.VALUE_CHUNK_HEADER:
 +          case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
 +            chunkOffset = reader.position();
 +            chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
 +            header = reader.readChunkHeader(marker);
 +            if (header.getDataSize() == 0) {
 +              handleEmptyValueChunk(
 +                  header, pageIndex2ChunkData, chunkMetadata, isTimeChunkNeedDecode);
 +              break;
 +            }
 +
 +            if (!isTimeChunkNeedDecode) {
 +              AlignedChunkData alignedChunkData = pageIndex2ChunkData.get(1).get(0);
 +              alignedChunkData.addValueChunk(header);
 +              alignedChunkData.writeEntireChunk(
 +                  reader.readChunk(-1, header.getDataSize()), chunkMetadata);
 +              break;
 +            }
 +
 +            Set<ChunkData> allChunkData = new HashSet<>();
 +            dataSize = header.getDataSize();
 +            pageIndex = 0;
 +            valueDecoder = Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
 +
 +            while (dataSize > 0) {
 +              PageHeader pageHeader =
 +                  reader.readPageHeader(
 +                      header.getDataType(),
 +                      (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
 +              List<AlignedChunkData> alignedChunkDataList = pageIndex2ChunkData.get(pageIndex);
 +              for (AlignedChunkData alignedChunkData : alignedChunkDataList) {
 +                if (!allChunkData.contains(alignedChunkData)) {
 +                  alignedChunkData.addValueChunk(header);
 +                  allChunkData.add(alignedChunkData);
 +                }
 +              }
 +              if (alignedChunkDataList.size() == 1) { // write entire page
 +                // write the entire page if it's not an empty page.
 +                alignedChunkDataList
 +                    .get(0)
 +                    .writeEntirePage(pageHeader, reader.readCompressedPage(pageHeader));
 +              } else { // decode page
 +                long[] times = pageIndex2Times.get(pageIndex);
 +                TsPrimitiveType[] values =
 +                    decodeValuePage(reader, header, pageHeader, times, valueDecoder);
 +                for (AlignedChunkData alignedChunkData : alignedChunkDataList) {
 +                  alignedChunkData.writeDecodeValuePage(times, values, header.getDataType());
 +                }
 +              }
 +              long pageDataSize = pageHeader.getSerializedPageSize();
 +              pageIndex += 1;
 +              dataSize -= pageDataSize;
 +            }
 +            break;
 +          case MetaMarker.CHUNK_GROUP_HEADER:
 +            ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
 +            curDevice = chunkGroupHeader.getDeviceID();
 +            break;
 +          case MetaMarker.OPERATION_INDEX_RANGE:
 +            reader.readPlanIndex();
 +            break;
 +          default:
 +            MetaMarker.handleUnexpectedMarker(marker);
 +        }
 +        marker = -1;
 +        if (!nextSplits.isEmpty()) {
 +          return;
 +        }
 +      }
 +
 +      consumeAllAlignedChunkData(reader.position(), pageIndex2ChunkData);
 +      handleModification(offset2Deletions, Long.MAX_VALUE);
 +      close();
 +      if (asyncExecutor != null) {
 +        try {
 +          nextSplits.put(new EmptyTsFileData());
 +        } catch (InterruptedException e) {
 +          asyncTask.cancel(true);
 +        }
 +      }
 +    }
 +
 +    private class EmptyTsFileData implements TsFileData {
 +
 +      @Override
 +      public long getDataSize() {
 +        return 0;
 +      }
 +
 +      @Override
-       public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
-       }
++      public void writeToFileWriter(TsFileIOWriter writer) throws IOException {}
 +
 +      @Override
 +      public boolean isModification() {
 +        return false;
 +      }
 +
 +      @Override
-       public void serialize(DataOutputStream stream) throws IOException {
-       }
++      public void serialize(DataOutputStream stream) throws IOException {}
 +
 +      @Override
 +      public int getSplitId() {
 +        return 0;
 +      }
 +
 +      @Override
-       public void setSplitId(int sid) {
-       }
++      public void setSplitId(int sid) {}
 +    }
 +
 +    private void getAllModification(Map<Long, List<Deletion>> offset2Deletions) throws IOException {
 +      try (ModificationFile modificationFile =
 +          new ModificationFile(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX)) {
 +        for (Modification modification : modificationFile.getModifications()) {
 +          offset2Deletions
 +              .computeIfAbsent(modification.getFileOffset(), o -> new ArrayList<>())
 +              .add((Deletion) modification);
 +        }
 +      }
 +    }
 +
 +    private boolean checkMagic(TsFileSequenceReader reader) throws IOException {
 +      String magic = reader.readHeadMagic();
 +      if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
 +        logger.error("the file's MAGIC STRING is incorrect, file path: {}", reader.getFileName());
 +        return false;
 +      }
 +
 +      byte versionNumber = reader.readVersionNumber();
 +      if (versionNumber != TSFileConfig.VERSION_NUMBER) {
 +        logger.error("the file's Version Number is incorrect, file path: {}", reader.getFileName());
 +        return false;
 +      }
 +
 +      if (!reader.readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
 +        logger.error("the file is not closed correctly, file path: {}", reader.getFileName());
 +        return false;
 +      }
 +      return true;
 +    }
 +
 +    private void getChunkMetadata(
 +        TsFileSequenceReader reader, Map<Long, IChunkMetadata> offset2ChunkMetadata)
 +        throws IOException {
 +      Map<String, List<TimeseriesMetadata>> device2Metadata = reader.getAllTimeseriesMetadata(true);
 +      for (Map.Entry<String, List<TimeseriesMetadata>> entry : device2Metadata.entrySet()) {
 +        for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
 +          for (IChunkMetadata chunkMetadata : timeseriesMetadata.getChunkMetadataList()) {
 +            offset2ChunkMetadata.put(chunkMetadata.getOffsetOfChunkHeader(), chunkMetadata);
 +          }
 +        }
 +      }
 +    }
 +
 +    private void handleModification(
 +        TreeMap<Long, List<Deletion>> offset2Deletions, long chunkOffset) {
 +      while (!offset2Deletions.isEmpty() && offset2Deletions.firstEntry().getKey() <= chunkOffset) {
 +        offset2Deletions
 +            .pollFirstEntry()
 +            .getValue()
 +            .forEach(o -> nextSplits.add(new DeletionData(o)));
 +      }
 +    }
 +
 +    private boolean consumeAllAlignedChunkData(
 +        long offset, Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData) {
 +      if (pageIndex2ChunkData.isEmpty()) {
 +        return false;
 +      }
 +
 +      Set<ChunkData> allChunkData = new HashSet<>();
 +      for (Map.Entry<Integer, List<AlignedChunkData>> entry : pageIndex2ChunkData.entrySet()) {
 +        allChunkData.addAll(entry.getValue());
 +      }
 +      for (ChunkData chunkData : allChunkData) {
 +        try {
 +          nextSplits.put(chunkData);
 +        } catch (InterruptedException e) {
 +          return false;
 +        }
 +      }
 +      pageIndex2ChunkData.clear();
 +      return true;
 +    }
 +
 +    private boolean needDecodeChunk(IChunkMetadata chunkMetadata) {
-       return !TimePartitionUtils.getTimePartition(
++      return !TimePartitionUtils.getTimePartitionSlot(
 +              chunkMetadata.getStartTime(), timePartitionInterval)
 +          .equals(
-               TimePartitionUtils.getTimePartition(
++              TimePartitionUtils.getTimePartitionSlot(
 +                  chunkMetadata.getEndTime(), timePartitionInterval));
 +    }
 +
 +    private boolean needDecodePage(PageHeader pageHeader, IChunkMetadata chunkMetadata) {
 +      if (pageHeader.getStatistics() == null) {
-         return !TimePartitionUtils.getTimePartition(
++        return !TimePartitionUtils.getTimePartitionSlot(
 +                chunkMetadata.getStartTime(), timePartitionInterval)
 +            .equals(
-                 TimePartitionUtils.getTimePartition(
++                TimePartitionUtils.getTimePartitionSlot(
 +                    chunkMetadata.getEndTime(), timePartitionInterval));
 +      }
-       return !TimePartitionUtils.getTimePartition(pageHeader.getStartTime(), timePartitionInterval)
++      return !TimePartitionUtils.getTimePartitionSlot(
++              pageHeader.getStartTime(), timePartitionInterval)
 +          .equals(
-               TimePartitionUtils.getTimePartition(pageHeader.getEndTime(), timePartitionInterval));
++              TimePartitionUtils.getTimePartitionSlot(
++                  pageHeader.getEndTime(), timePartitionInterval));
 +    }
 +
 +    private Pair<long[], Object[]> decodePage(
 +        boolean isAligned,
 +        ByteBuffer pageData,
 +        PageHeader pageHeader,
 +        Decoder timeDecoder,
 +        Decoder valueDecoder,
 +        ChunkHeader chunkHeader)
 +        throws IOException {
 +      if (isAligned) {
 +        TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, timeDecoder);
 +        long[] times = timePageReader.getNextTimeBatch();
 +        return new Pair<>(times, new Object[times.length]);
 +      }
 +
 +      valueDecoder.reset();
 +      PageReader pageReader =
 +          new PageReader(pageData, chunkHeader.getDataType(), valueDecoder, timeDecoder, null);
 +      BatchData batchData = pageReader.getAllSatisfiedPageData();
 +      long[] times = new long[batchData.length()];
 +      Object[] values = new Object[batchData.length()];
 +      int index = 0;
 +      while (batchData.hasCurrent()) {
 +        times[index] = batchData.currentTime();
 +        values[index++] = batchData.currentValue();
 +        batchData.next();
 +      }
 +      return new Pair<>(times, values);
 +    }
 +
 +    private void handleEmptyValueChunk(
 +        ChunkHeader header,
 +        Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData,
 +        IChunkMetadata chunkMetadata,
 +        boolean isTimeChunkNeedDecode)
 +        throws IOException {
 +      Set<ChunkData> allChunkData = new HashSet<>();
 +      for (Map.Entry<Integer, List<AlignedChunkData>> entry : pageIndex2ChunkData.entrySet()) {
 +        for (AlignedChunkData alignedChunkData : entry.getValue()) {
 +          if (!allChunkData.contains(alignedChunkData)) {
 +            alignedChunkData.addValueChunk(header);
 +            if (!isTimeChunkNeedDecode) {
 +              alignedChunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata);
 +            }
 +            allChunkData.add(alignedChunkData);
 +          }
 +        }
 +      }
 +    }
 +
 +    private TsPrimitiveType[] decodeValuePage(
 +        TsFileSequenceReader reader,
 +        ChunkHeader chunkHeader,
 +        PageHeader pageHeader,
 +        long[] times,
 +        Decoder valueDecoder)
 +        throws IOException {
 +      if (pageHeader.getSerializedPageSize() == 0) {
 +        return new TsPrimitiveType[times.length];
 +      }
 +
 +      valueDecoder.reset();
 +      ByteBuffer pageData = reader.readPage(pageHeader, chunkHeader.getCompressionType());
 +      ValuePageReader valuePageReader =
 +          new ValuePageReader(pageHeader, pageData, chunkHeader.getDataType(), valueDecoder);
 +      return valuePageReader.nextValueBatch(
 +          times); // should be origin time, so recording satisfied length is necessary
 +    }
 +  }
 +}
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
index bf1c79172b3,00000000000..fa5aa285ed7
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
@@@ -1,169 -1,0 +1,173 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.load;
 +
 +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 +import org.apache.iotdb.tsfile.utils.Pair;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.util.ArrayList;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +/**
 + * TsFileDataManager batches splits generated by TsFileSplitter as LoadTsFilePieceNode, route the
 + * splits to associated replica set, and sends them to the replicas with the provided dispatching
 + * function.
 + */
 +@SuppressWarnings("BooleanMethodIsAlwaysInverted")
 +public class TsFileDataManager {
 +
 +  private static final Logger logger = LoggerFactory.getLogger(TsFileDataManager.class);
 +  protected final long maxMemorySize;
 +  protected final DispatchFunction dispatchFunction;
 +  private final DataPartitionBatchFetcher partitionBatchFetcher;
 +  protected final PlanNodeId planNodeId;
 +  protected final File targetFile;
 +
 +  protected long dataSize;
 +  protected final Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
 +  protected final List<ChunkData> nonDirectionalChunkData;
++  protected final String userName;
 +
 +  @FunctionalInterface
 +  public interface DispatchFunction {
 +
 +    boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode, TRegionReplicaSet replicaSet);
 +  }
 +
 +  public TsFileDataManager(
 +      DispatchFunction dispatchFunction,
 +      PlanNodeId planNodeId,
 +      File targetFile,
 +      DataPartitionBatchFetcher partitionBatchFetcher,
-       long maxMemorySize) {
++      long maxMemorySize,
++      String userName) {
 +    this.dispatchFunction = dispatchFunction;
 +    this.planNodeId = planNodeId;
 +    this.targetFile = targetFile;
 +    this.dataSize = 0;
 +    this.replicaSet2Piece = new HashMap<>();
 +    this.nonDirectionalChunkData = new ArrayList<>();
 +    this.partitionBatchFetcher = partitionBatchFetcher;
 +    this.maxMemorySize = maxMemorySize;
++    this.userName = userName;
 +  }
 +
 +  public boolean addOrSendTsFileData(TsFileData tsFileData) {
 +    return tsFileData.isModification()
 +        ? addOrSendDeletionData(tsFileData)
 +        : addOrSendChunkData((ChunkData) tsFileData);
 +  }
 +
 +  protected boolean addOrSendChunkData(ChunkData chunkData) {
 +    nonDirectionalChunkData.add(chunkData);
 +    dataSize += chunkData.getDataSize();
 +
 +    if (dataSize > maxMemorySize) {
 +      return flushChunkData(false);
 +    }
 +
 +    return true;
 +  }
 +
 +  protected boolean flushChunkData(boolean flushAll) {
 +    routeChunkData();
 +
 +    // start to dispatch from the biggest TsFilePieceNode
 +    List<TRegionReplicaSet> sortedReplicaSets =
 +        replicaSet2Piece.keySet().stream()
 +            .sorted(Comparator.comparingLong(o -> replicaSet2Piece.get(o).getDataSize()).reversed())
 +            .collect(Collectors.toList());
 +
 +    for (TRegionReplicaSet sortedReplicaSet : sortedReplicaSets) {
 +      LoadTsFilePieceNode pieceNode = replicaSet2Piece.get(sortedReplicaSet);
 +      if (pieceNode.getDataSize() == 0) { // total data size has been reduced to 0
 +        break;
 +      }
 +      if (!dispatchFunction.dispatchOnePieceNode(pieceNode, sortedReplicaSet)) {
 +        return false;
 +      }
 +
 +      dataSize -= pieceNode.getDataSize();
 +      replicaSet2Piece.put(
 +          sortedReplicaSet,
 +          new LoadTsFilePieceNode(
 +              planNodeId, targetFile)); // can not just remove, because of deletion
 +      if (!flushAll && dataSize <= maxMemorySize) {
 +        break;
 +      }
 +    }
 +    return true;
 +  }
 +
 +  protected void routeChunkData() {
 +    if (nonDirectionalChunkData.isEmpty()) {
 +      return;
 +    }
 +
 +    List<TRegionReplicaSet> replicaSets =
 +        partitionBatchFetcher.queryDataPartition(
 +            nonDirectionalChunkData.stream()
 +                .map(data -> new Pair<>(data.getDevice(), data.getTimePartitionSlot()))
-                 .collect(Collectors.toList()));
++                .collect(Collectors.toList()),
++            userName);
 +    IntStream.range(0, nonDirectionalChunkData.size())
 +        .forEach(
 +            i ->
 +                replicaSet2Piece
 +                    .computeIfAbsent(
 +                        replicaSets.get(i), o -> new LoadTsFilePieceNode(planNodeId, targetFile))
 +                    .addTsFileData(nonDirectionalChunkData.get(i)));
 +    nonDirectionalChunkData.clear();
 +  }
 +
 +  private boolean addOrSendDeletionData(TsFileData deletionData) {
 +    routeChunkData(); // ensure chunk data will be added before deletion
 +
 +    for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) {
 +      dataSize += deletionData.getDataSize();
 +      entry.getValue().addTsFileData(deletionData);
 +    }
 +    return true;
 +  }
 +
 +  public boolean sendAllTsFileData() {
 +    routeChunkData();
 +
 +    for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) {
 +      if (entry.getValue().getDataSize() > 0
 +          && !dispatchFunction.dispatchOnePieceNode(entry.getValue(), entry.getKey())) {
 +        logger.warn("Dispatch piece node {} of TsFile {} error.", entry.getValue(), targetFile);
 +        return false;
 +      }
 +    }
 +    return true;
 +  }
 +}
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
index 2f09a003106,00000000000..d6f0ac87e79
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
@@@ -1,464 -1,0 +1,474 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.load;
 +
- import java.util.ArrayDeque;
- import java.util.Queue;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
 +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 +import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 +import org.apache.iotdb.common.rpc.thrift.TSStatus;
 +import org.apache.iotdb.commons.client.IClientManager;
 +import org.apache.iotdb.commons.client.exception.ClientManagerException;
 +import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 +import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 +import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
 +import org.apache.iotdb.db.queryengine.execution.load.locseq.LocationSequencer;
 +import org.apache.iotdb.db.queryengine.execution.load.locseq.LocationStatistics;
 +import org.apache.iotdb.db.queryengine.execution.load.locseq.ThroughputBasedLocationSequencer;
 +import org.apache.iotdb.db.queryengine.execution.load.nodesplit.ClusteringMeasurementSplitter;
- import org.apache.iotdb.db.queryengine.execution.load.nodesplit.OrderedMeasurementSplitter;
 +import org.apache.iotdb.db.queryengine.execution.load.nodesplit.PieceNodeSplitter;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 +import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
 +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 +import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 +import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 +import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 +import org.apache.iotdb.rpc.TSStatusCode;
 +import org.apache.iotdb.tsfile.compress.ICompressor;
 +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 +import org.apache.iotdb.tsfile.utils.Pair;
 +
 +import org.apache.thrift.TException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
++import java.util.ArrayDeque;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
++import java.util.Queue;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentSkipListSet;
++import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Future;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.stream.Collectors;
 +
 +import static org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileDispatcherImpl.NODE_CONNECTION_ERROR;
 +
 +public class TsFileSplitSender {
 +
 +  private static final int MAX_RETRY = 5;
 +  private static final long RETRY_INTERVAL_MS = 6_000L;
 +  private static final int MAX_PENDING_PIECE_NODE = 5;
 +  private static final Logger logger = LoggerFactory.getLogger(TsFileSplitSender.class);
 +
 +  private LoadTsFileNode loadTsFileNode;
 +  private DataPartitionBatchFetcher targetPartitionFetcher;
 +  private long targetPartitionInterval;
 +  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
 +      internalServiceClientManager;
 +  // All consensus groups accessed in Phase1 should be notified in Phase2
 +  private final Set<TRegionReplicaSet> allReplicaSets = new ConcurrentSkipListSet<>();
 +  private String uuid;
 +  private LocationStatistics locationStatistics = new LocationStatistics();
 +  private boolean isGeneratedByPipe;
 +  private Map<Pair<LoadTsFilePieceNode, TRegionReplicaSet>, Exception> phaseOneFailures =
 +      new ConcurrentHashMap<>();
 +  private Map<TRegionReplicaSet, Exception> phaseTwoFailures = new HashMap<>();
 +  private long maxSplitSize;
 +  private PieceNodeSplitter pieceNodeSplitter = new ClusteringMeasurementSplitter(1.0, 10);
- //        private PieceNodeSplitter pieceNodeSplitter = new OrderedMeasurementSplitter();
++  //        private PieceNodeSplitter pieceNodeSplitter = new OrderedMeasurementSplitter();
 +  private CompressionType compressionType = CompressionType.LZ4;
 +  private Statistic statistic = new Statistic();
 +  private ExecutorService splitNodeService;
 +  private Queue<Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet>> splitFutures;
 +  private int maxConcurrentFileNum;
++  private String userName;
 +
 +  public TsFileSplitSender(
 +      LoadTsFileNode loadTsFileNode,
 +      DataPartitionBatchFetcher targetPartitionFetcher,
 +      long targetPartitionInterval,
 +      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager,
 +      boolean isGeneratedByPipe,
 +      long maxSplitSize,
-       int maxConcurrentFileNum) {
++      int maxConcurrentFileNum,
++      String userName) {
 +    this.loadTsFileNode = loadTsFileNode;
 +    this.targetPartitionFetcher = targetPartitionFetcher;
 +    this.targetPartitionInterval = targetPartitionInterval;
 +    this.internalServiceClientManager = internalServiceClientManager;
 +    this.isGeneratedByPipe = isGeneratedByPipe;
 +    this.maxSplitSize = maxSplitSize;
 +    this.splitNodeService = IoTDBThreadPoolFactory.newCachedThreadPool("SplitLoadTsFilePieceNode");
 +    this.splitFutures = new ArrayDeque<>(MAX_PENDING_PIECE_NODE);
 +    this.maxConcurrentFileNum = maxConcurrentFileNum;
++    this.userName = userName;
 +  }
 +
 +  public void start() throws IOException {
 +    statistic.taskStartTime = System.currentTimeMillis();
 +    // skip files without data
 +    loadTsFileNode.getResources().removeIf(f -> f.getDevices().isEmpty());
 +    uuid = UUID.randomUUID().toString();
 +
 +    boolean isFirstPhaseSuccess = firstPhase(loadTsFileNode);
 +    boolean isSecondPhaseSuccess = secondPhase(isFirstPhaseSuccess);
 +    if (isFirstPhaseSuccess && isSecondPhaseSuccess) {
 +      logger.info("Load TsFiles {} Successfully", loadTsFileNode.getResources());
 +    } else {
 +      logger.warn("Can not Load TsFiles {}", loadTsFileNode.getResources());
 +    }
 +    statistic.taskEndTime = System.currentTimeMillis();
 +    locationStatistics.logLocationStatistics();
 +    statistic.logStatistic();
 +  }
 +
 +  private boolean firstPhase(LoadTsFileNode node) throws IOException {
 +    long start = System.currentTimeMillis();
 +    TsFileDataManager tsFileDataManager =
 +        new DeviceBatchTsFileDataManager(
 +            this::dispatchOnePieceNode,
 +            node.getPlanNodeId(),
 +            node.lastResource().getTsFile(),
 +            targetPartitionFetcher,
-             maxSplitSize);
++            maxSplitSize,
++            userName);
 +
 +    ExecutorService executorService =
 +        IoTDBThreadPoolFactory.newThreadPool(
 +            32,
 +            Integer.MAX_VALUE,
 +            20,
 +            TimeUnit.SECONDS,
 +            new SynchronousQueue<>(),
 +            new IoTThreadFactory("MergedTsFileSplitter"),
 +            "MergedTsFileSplitter");
 +    MergedTsFileSplitter splitter =
 +        new MergedTsFileSplitter(
 +            node.getResources().stream()
 +                .map(TsFileResource::getTsFile)
 +                .collect(Collectors.toList()),
 +            tsFileDataManager::addOrSendTsFileData,
 +            executorService,
 +            targetPartitionInterval,
 +            maxConcurrentFileNum);
 +    splitter.splitTsFileByDataPartition();
 +    splitter.close();
 +    logger.info("Split ends after {}ms", System.currentTimeMillis() - start);
-     boolean success = tsFileDataManager.sendAllTsFileData() && processRemainingPieceNodes()
-         && phaseOneFailures.isEmpty();
++    boolean success =
++        tsFileDataManager.sendAllTsFileData()
++            && processRemainingPieceNodes()
++            && phaseOneFailures.isEmpty();
 +    logger.info("Cleanup ends after {}ms", System.currentTimeMillis() - start);
 +    return success;
 +  }
 +
 +  private boolean secondPhase(boolean isFirstPhaseSuccess) {
 +    logger.info("Start dispatching Load command for uuid {}", uuid);
 +    TLoadCommandReq loadCommandReq =
 +        new TLoadCommandReq(
 +            (isFirstPhaseSuccess ? LoadCommand.EXECUTE : LoadCommand.ROLLBACK).ordinal(), uuid);
 +    loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
 +
 +    for (TRegionReplicaSet replicaSet : allReplicaSets) {
 +      loadCommandReq.setUseConsensus(true);
 +      for (TDataNodeLocation dataNodeLocation : replicaSet.getDataNodeLocations()) {
 +        TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
 +        Exception groupException = null;
 +        loadCommandReq.setConsensusGroupId(replicaSet.getRegionId());
 +
 +        for (int i = 0; i < MAX_RETRY; i++) {
 +          try (SyncDataNodeInternalServiceClient client =
 +              internalServiceClientManager.borrowClient(endPoint)) {
 +            TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
 +            if (!loadResp.isAccepted()) {
 +              logger.warn(loadResp.message);
 +              groupException = new FragmentInstanceDispatchException(loadResp.status);
 +            }
 +            break;
 +          } catch (ClientManagerException | TException e) {
 +            logger.warn(NODE_CONNECTION_ERROR, endPoint, e);
 +            TSStatus status = new TSStatus();
 +            status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
 +            status.setMessage(
 +                "can't connect to node {}, please reset longer dn_connection_timeout_ms "
 +                    + "in iotdb-common.properties and restart iotdb."
 +                    + endPoint);
 +            groupException = new FragmentInstanceDispatchException(status);
 +          }
 +          try {
 +            Thread.sleep(RETRY_INTERVAL_MS);
 +          } catch (InterruptedException e) {
 +            groupException = e;
 +            break;
 +          }
 +        }
 +
 +        if (groupException != null) {
 +          phaseTwoFailures.put(replicaSet, groupException);
 +        } else {
 +          break;
 +        }
 +      }
 +    }
 +
 +    return phaseTwoFailures.isEmpty();
 +  }
 +
 +  public LocationSequencer createLocationSequencer(TRegionReplicaSet replicaSet) {
 +    //    return new FixedLocationSequencer(replicaSet);
 +    //    return new RandomLocationSequencer(replicaSet);
 +    return new ThroughputBasedLocationSequencer(replicaSet, locationStatistics);
 +  }
 +
 +  private ByteBuffer compressBuffer(ByteBuffer buffer) throws IOException {
 +    statistic.rawSize.addAndGet(buffer.remaining());
 +    if (compressionType.equals(CompressionType.UNCOMPRESSED)) {
 +      statistic.compressedSize.addAndGet(buffer.remaining());
 +      return buffer;
 +    }
 +    ICompressor compressor = ICompressor.getCompressor(compressionType);
 +    int maxBytesForCompression = compressor.getMaxBytesForCompression(buffer.remaining()) + 1;
 +    ByteBuffer compressed = ByteBuffer.allocate(maxBytesForCompression);
-     int compressLength = compressor.compress(buffer.array(),
-         buffer.arrayOffset() + buffer.position(), buffer.remaining(), compressed.array());
++    int compressLength =
++        compressor.compress(
++            buffer.array(),
++            buffer.arrayOffset() + buffer.position(),
++            buffer.remaining(),
++            compressed.array());
 +    compressed.limit(compressLength);
 +    statistic.compressedSize.addAndGet(compressLength);
 +    return compressed;
 +  }
 +
 +  private Future<List<LoadTsFilePieceNode>> submitSplitPieceNode(LoadTsFilePieceNode pieceNode) {
 +    return splitNodeService.submit(() -> pieceNodeSplitter.split(pieceNode));
 +  }
 +
 +  private boolean processRemainingPieceNodes() {
 +    List<LoadTsFilePieceNode> subNodes;
 +    for (Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet> pair : splitFutures) {
 +      try {
 +        subNodes = pair.left.get();
 +      } catch (InterruptedException | ExecutionException e) {
 +        logger.error("Unexpected error during splitting node", e);
 +        return false;
 +      }
 +      if (!dispatchPieceNodes(subNodes, pair.right)) {
 +        return false;
 +      }
 +    }
 +    return true;
 +  }
 +
-   private boolean dispatchPieceNodes(List<LoadTsFilePieceNode> subNodes,
-       TRegionReplicaSet replicaSet) {
++  private boolean dispatchPieceNodes(
++      List<LoadTsFilePieceNode> subNodes, TRegionReplicaSet replicaSet) {
 +    AtomicLong minDispatchTime = new AtomicLong(Long.MAX_VALUE);
 +    AtomicLong maxDispatchTime = new AtomicLong(Long.MIN_VALUE);
 +    AtomicLong sumDispatchTime = new AtomicLong();
 +    AtomicLong sumCompressingTime = new AtomicLong();
 +
 +    long start = System.nanoTime();
 +    List<Boolean> subNodeResults =
 +        subNodes.stream()
 +            .parallel()
 +            .map(
 +                node -> {
 +                  ByteBuffer buffer;
 +                  long startTime = System.nanoTime();
 +                  int uncompressedLength;
 +                  try {
 +                    buffer = node.serializeToByteBuffer();
 +                    uncompressedLength = buffer.remaining();
 +                    buffer = compressBuffer(buffer);
 +                  } catch (IOException e) {
 +                    phaseOneFailures.put(new Pair<>(node, replicaSet), e);
 +                    return false;
 +                  }
 +                  long compressingTime = System.nanoTime() - startTime;
 +                  sumCompressingTime.addAndGet(compressingTime);
 +                  statistic.compressingTime.addAndGet(compressingTime);
 +
 +                  TTsFilePieceReq loadTsFileReq =
 +                      new TTsFilePieceReq(buffer, uuid, replicaSet.getRegionId());
 +                  loadTsFileReq.isRelay = true;
 +                  loadTsFileReq.setCompressionType(compressionType.serialize());
 +                  loadTsFileReq.setUncompressedLength(uncompressedLength);
 +                  LocationSequencer locationSequencer = createLocationSequencer(replicaSet);
 +
 +                  boolean loadSucceed = false;
 +                  Exception lastConnectionError = null;
 +                  TDataNodeLocation currLocation = null;
 +                  for (TDataNodeLocation location : locationSequencer) {
 +                    if (location.getDataNodeId() == 0 && logger.isDebugEnabled()) {
 +                      locationStatistics.logLocationStatistics();
 +                      logger.info("Chose location {}", location.getDataNodeId());
 +                    }
 +                    currLocation = location;
 +                    startTime = System.nanoTime();
 +                    for (int i = 0; i < MAX_RETRY; i++) {
 +                      try (SyncDataNodeInternalServiceClient client =
 +                          internalServiceClientManager.borrowClient(
 +                              currLocation.internalEndPoint)) {
 +                        TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
 +                        logger.debug("Response from {}: {}", location.getDataNodeId(), loadResp);
 +                        if (!loadResp.isAccepted()) {
 +                          logger.warn(loadResp.message);
 +                          phaseOneFailures.put(
 +                              new Pair<>(node, replicaSet),
 +                              new FragmentInstanceDispatchException(loadResp.status));
 +                          return false;
 +                        }
 +                        loadSucceed = true;
 +                        break;
 +                      } catch (ClientManagerException | TException e) {
 +                        lastConnectionError = e;
 +                      }
 +
 +                      try {
 +                        Thread.sleep(RETRY_INTERVAL_MS);
 +                      } catch (InterruptedException e) {
 +                        return false;
 +                      }
 +                    }
 +
 +                    if (loadSucceed) {
 +                      break;
 +                    }
 +                  }
 +
 +                  if (!loadSucceed) {
 +                    String warning = NODE_CONNECTION_ERROR;
 +                    logger.warn(warning, currLocation, lastConnectionError);
 +                    TSStatus status = new TSStatus();
 +                    status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
 +                    status.setMessage(warning + currLocation);
 +                    phaseOneFailures.put(
 +                        new Pair<>(node, replicaSet),
 +                        new FragmentInstanceDispatchException(status));
 +                    return false;
 +                  }
 +                  long timeConsumption = System.nanoTime() - startTime;
 +                  logger.debug("Time consumption: {}", timeConsumption);
 +                  locationStatistics.updateThroughput(
 +                      currLocation, node.getDataSize(), timeConsumption);
 +
 +                  synchronized (maxDispatchTime) {
 +                    if (maxDispatchTime.get() < timeConsumption) {
 +                      maxDispatchTime.set(timeConsumption);
 +                    }
 +                  }
 +                  synchronized (minDispatchTime) {
 +                    if (minDispatchTime.get() > timeConsumption) {
 +                      minDispatchTime.set(timeConsumption);
 +                    }
 +                  }
 +                  sumDispatchTime.addAndGet(timeConsumption);
 +                  return true;
 +                })
 +            .collect(Collectors.toList());
 +    long elapsedTime = System.nanoTime() - start;
 +    statistic.dispatchNodesTime.addAndGet(elapsedTime);
 +    logger.debug(
 +        "Dispatched one node after {}ms, min/maxDispatching time: {}/{}ns, avg: {}ns, sum: {}ns, compressing: {}ns",
-         elapsedTime / 1_000_000L, minDispatchTime.get(),
-         maxDispatchTime.get(), sumDispatchTime.get() / subNodes.size(), sumDispatchTime.get(),
++        elapsedTime / 1_000_000L,
++        minDispatchTime.get(),
++        maxDispatchTime.get(),
++        sumDispatchTime.get() / subNodes.size(),
++        sumDispatchTime.get(),
 +        sumCompressingTime.get());
 +    return !subNodeResults.contains(false);
 +  }
 +
 +  public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode, TRegionReplicaSet replicaSet) {
 +    long allStart = System.nanoTime();
 +    allReplicaSets.add(replicaSet);
 +    if (false) {
 +      long start = System.nanoTime();
 +      List<LoadTsFilePieceNode> split = pieceNodeSplitter.split(pieceNode);
 +      long elapsedTime = System.nanoTime() - start;
 +      statistic.splitTime.addAndGet(elapsedTime);
 +      statistic.pieceNodeNum.incrementAndGet();
-       logger.debug(
-           "{} splits are generated after {}ms", split.size(),
-           elapsedTime / 1_000_000L);
++      logger.debug("{} splits are generated after {}ms", split.size(), elapsedTime / 1_000_000L);
 +      boolean success = dispatchPieceNodes(split, replicaSet);
 +      statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
 +      return success;
 +    }
 +
 +    List<LoadTsFilePieceNode> subNodes;
 +    if (splitFutures.size() < MAX_PENDING_PIECE_NODE) {
 +      splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode), replicaSet));
 +      statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
 +      return true;
 +    } else {
 +      long start = System.nanoTime();
 +      Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet> pair = splitFutures.poll();
 +      try {
 +        subNodes = pair.left.get();
 +        long elapsedTime = System.nanoTime() - start;
 +        statistic.splitTime.addAndGet(elapsedTime);
 +        statistic.pieceNodeNum.incrementAndGet();
 +        logger.debug(
-             "{} splits are generated after {}ms", subNodes.size(),
-             elapsedTime / 1_000_000L);
++            "{} splits are generated after {}ms", subNodes.size(), elapsedTime / 1_000_000L);
 +
 +        splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode), replicaSet));
 +      } catch (InterruptedException | ExecutionException e) {
 +        logger.error("Unexpected error during splitting node", e);
 +        return false;
 +      }
 +      boolean success = dispatchPieceNodes(subNodes, pair.right);
 +      statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
 +      return success;
 +    }
 +  }
 +
 +  public static class Statistic {
 +
 +    public long taskStartTime;
 +    public long taskEndTime;
 +    public AtomicLong rawSize = new AtomicLong();
 +    public AtomicLong compressedSize = new AtomicLong();
 +    public AtomicLong splitTime = new AtomicLong();
 +    public AtomicLong pieceNodeNum = new AtomicLong();
 +    public AtomicLong dispatchNodesTime = new AtomicLong();
 +    public AtomicLong dispatchNodeTime = new AtomicLong();
 +    public AtomicLong compressingTime = new AtomicLong();
 +
 +    public void logStatistic() {
 +      logger.info("Time consumption: {}ms", taskEndTime - taskStartTime);
 +      logger.info(
 +          "Generated {} piece nodes, splitTime: {}, dispatchSplitsTime: {}, dispatchNodeTime: {}",
 +          pieceNodeNum.get(),
-           splitTime.get() / 1_000_000L, dispatchNodesTime.get() / 1_000_000L,
++          splitTime.get() / 1_000_000L,
++          dispatchNodesTime.get() / 1_000_000L,
 +          dispatchNodeTime.get() / 1_000_000L);
 +      logger.info(
 +          "Transmission size: {}/{} ({}), compressionTime: {}ms",
 +          compressedSize.get(),
 +          rawSize.get(),
 +          compressedSize.get() * 1.0 / rawSize.get(),
 +          compressingTime.get() / 1_000_000L);
 +    }
 +  }
 +
 +  public Statistic getStatistic() {
 +    return statistic;
 +  }
 +}
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
index 5542e04cf65,55c5bc0aa3a..2cd6190bcf1
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
@@@ -205,7 -198,8 +205,7 @@@ public class TsFileSplitter 
                        consumeChunkData(measurementId, chunkOffset, chunkData);
                      }
  
-                     timePartitionSlot = TimePartitionUtils.getTimePartition(times[i]);
+                     timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[i]);
 -                    satisfiedLength = 0;
                      endTime =
                          timePartitionSlot.getStartTime()
                              + TimePartitionUtils.getTimePartitionInterval();
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
index 3994941c8b3,00000000000..9a4fdf3b96f
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
@@@ -1,591 -1,0 +1,616 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.load.nodesplit;
 +
++import org.apache.iotdb.db.queryengine.execution.load.AlignedChunkData;
++import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
++import org.apache.iotdb.db.queryengine.execution.load.NonAlignedChunkData;
++import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
++import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
++import org.apache.iotdb.tsfile.file.header.ChunkHeader;
++import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
++import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
++import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
++import org.apache.iotdb.tsfile.read.common.Chunk;
++
++import net.ricecode.similarity.LevenshteinDistanceStrategy;
++import net.ricecode.similarity.SimilarityStrategy;
++import net.ricecode.similarity.StringSimilarityService;
++import net.ricecode.similarity.StringSimilarityServiceImpl;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.stream.Collectors;
- import net.ricecode.similarity.LevenshteinDistanceStrategy;
- import net.ricecode.similarity.SimilarityStrategy;
- import net.ricecode.similarity.StringSimilarityService;
- import net.ricecode.similarity.StringSimilarityServiceImpl;
- import org.apache.iotdb.db.queryengine.execution.load.AlignedChunkData;
- import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
- import org.apache.iotdb.db.queryengine.execution.load.NonAlignedChunkData;
- import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
- import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
- import org.apache.iotdb.tsfile.file.header.ChunkHeader;
- import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
- import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
- import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
- import org.apache.iotdb.tsfile.read.common.Chunk;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
 +
 +public class ClusteringMeasurementSplitter implements PieceNodeSplitter {
 +
 +  private static final Logger logger = LoggerFactory.getLogger(ClusteringMeasurementSplitter.class);
 +  private double groupFactor;
 +  private int maxIteration;
 +  private int dataSampleLength = 32;
 +  private double stdErrThreshold = 1;
 +  private long splitStartTime;
 +  private long splitTimeBudget = 5;
 +  private static final int MAX_CLUSTER_NUM = 500;
 +
 +  public ClusteringMeasurementSplitter(double groupFactor, int maxIteration) {
 +    this.groupFactor = groupFactor;
 +    this.maxIteration = maxIteration;
 +  }
 +
 +  @Override
 +  public List<LoadTsFilePieceNode> split(LoadTsFilePieceNode pieceNode) {
 +    splitStartTime = System.currentTimeMillis();
 +    if (pieceNode.isHasModification()) {
 +      // the order of modifications should be preserved, so with modifications clustering cannot be
 +      // used
 +      return new OrderedMeasurementSplitter().split(pieceNode);
 +    }
 +
 +    // split by measurement first
-     Map<String, LoadTsFilePieceNode> measurementPieceNodeMap = new HashMap<>(
-         pieceNode.getAllTsFileData().size());
++    Map<String, LoadTsFilePieceNode> measurementPieceNodeMap =
++        new HashMap<>(pieceNode.getAllTsFileData().size());
 +    for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
 +      ChunkData chunkData = (ChunkData) tsFileData;
 +      String currMeasurement = chunkData.firstMeasurement();
 +      LoadTsFilePieceNode pieceNodeSplit =
 +          measurementPieceNodeMap.computeIfAbsent(
 +              currMeasurement,
 +              m -> new LoadTsFilePieceNode(pieceNode.getPlanNodeId(), pieceNode.getTsFile()));
 +      pieceNodeSplit.addTsFileData(chunkData);
 +    }
 +    // use clustering to merge similar measurements
 +    List<LoadTsFilePieceNode> loadTsFilePieceNodes = clusterPieceNode(measurementPieceNodeMap);
 +    if (logger.isDebugEnabled()) {
-       logger.debug("Split distribution before refinement: {}",
-           loadTsFilePieceNodes.stream().map(l -> l.getAllTsFileData().size())
++      logger.debug(
++          "Split distribution before refinement: {}",
++          loadTsFilePieceNodes.stream()
++              .map(l -> l.getAllTsFileData().size())
 +              .collect(Collectors.toList()));
 +      refineClusters(loadTsFilePieceNodes);
-       logger.debug("Split distribution after refinement: {}",
-           loadTsFilePieceNodes.stream().map(l -> l.getAllTsFileData().size())
++      logger.debug(
++          "Split distribution after refinement: {}",
++          loadTsFilePieceNodes.stream()
++              .map(l -> l.getAllTsFileData().size())
 +              .collect(Collectors.toList()));
 +    }
 +    return loadTsFilePieceNodes;
 +  }
 +
 +  private void refineClusters(List<LoadTsFilePieceNode> pieceNodes) {
-     double average = pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average()
-         .orElse(0.0);
++    double average =
++        pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average().orElse(0.0);
 +    double finalAverage1 = average;
-     double stderr = Math.sqrt(pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
-         .mapToDouble(s -> (s - finalAverage1) * (s - finalAverage1)).sum() / pieceNodes.size());
-     logger.debug("{} splits before refinement, average/stderr: {}/{}", pieceNodes.size(), average,
-         stderr);
++    double stderr =
++        Math.sqrt(
++            pieceNodes.stream()
++                    .mapToLong(LoadTsFilePieceNode::getDataSize)
++                    .mapToDouble(s -> (s - finalAverage1) * (s - finalAverage1))
++                    .sum()
++                / pieceNodes.size());
++    logger.debug(
++        "{} splits before refinement, average/stderr: {}/{}", pieceNodes.size(), average, stderr);
 +
 +    while (stderr > average * stdErrThreshold
 +        && System.currentTimeMillis() - splitStartTime < splitTimeBudget) {
 +      pieceNodes.sort(Comparator.comparingLong(LoadTsFilePieceNode::getDataSize));
 +      LoadTsFilePieceNode smallestPiece = pieceNodes.get(0);
 +      LoadTsFilePieceNode largestPiece = pieceNodes.get(pieceNodes.size() - 1);
 +      double smallDiff = average - smallestPiece.getDataSize();
 +      double largeDiff = largestPiece.getDataSize() - average;
 +      if (smallDiff > largeDiff) {
 +        mergeSmallPiece(pieceNodes);
 +      } else {
 +        if (!splitLargePiece(pieceNodes)) {
 +          // the largest node may not be splittable (only one series), merge the smallest instead
 +          mergeSmallPiece(pieceNodes);
 +        }
 +      }
 +
-       average = pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average()
-           .orElse(0.0);
++      average =
++          pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average().orElse(0.0);
 +      double finalAverage = average;
-       stderr = Math.sqrt(pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
-           .mapToDouble(s -> (s - finalAverage) * (s - finalAverage)).sum() / pieceNodes.size());
++      stderr =
++          Math.sqrt(
++              pieceNodes.stream()
++                      .mapToLong(LoadTsFilePieceNode::getDataSize)
++                      .mapToDouble(s -> (s - finalAverage) * (s - finalAverage))
++                      .sum()
++                  / pieceNodes.size());
 +      logger.debug("{} splits, average/stderr: {}/{}", pieceNodes.size(), average, stderr);
 +    }
-     logger.debug("{} splits after refinement, average/stderr: {}/{}", pieceNodes.size(), average,
-         stderr);
++    logger.debug(
++        "{} splits after refinement, average/stderr: {}/{}", pieceNodes.size(), average, stderr);
 +  }
 +
 +  private void mergeSmallPiece(List<LoadTsFilePieceNode> pieceNodes) {
 +    LoadTsFilePieceNode pieceA = pieceNodes.remove(0);
 +    LoadTsFilePieceNode pieceB = pieceNodes.remove(0);
-     LoadTsFilePieceNode newPiece = new LoadTsFilePieceNode(pieceA.getPlanNodeId(),
-         pieceA.getTsFile());
++    LoadTsFilePieceNode newPiece =
++        new LoadTsFilePieceNode(pieceA.getPlanNodeId(), pieceA.getTsFile());
 +    pieceA.getAllTsFileData().forEach(newPiece::addTsFileData);
 +    pieceB.getAllTsFileData().forEach(newPiece::addTsFileData);
 +    pieceNodes.add(newPiece);
-     pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
-         .average().orElse(0.0);
-     pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
-         .sum();
++    pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average().orElse(0.0);
++    pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).sum();
 +  }
 +
 +  private boolean splitLargePiece(List<LoadTsFilePieceNode> pieceNodes) {
 +    LoadTsFilePieceNode oldPiece = pieceNodes.remove(pieceNodes.size() - 1);
-     LoadTsFilePieceNode newNodeA = new LoadTsFilePieceNode(oldPiece.getPlanNodeId(),
-         oldPiece.getTsFile());
-     LoadTsFilePieceNode newNodeB = new LoadTsFilePieceNode(oldPiece.getPlanNodeId(),
-         oldPiece.getTsFile());
++    LoadTsFilePieceNode newNodeA =
++        new LoadTsFilePieceNode(oldPiece.getPlanNodeId(), oldPiece.getTsFile());
++    LoadTsFilePieceNode newNodeB =
++        new LoadTsFilePieceNode(oldPiece.getPlanNodeId(), oldPiece.getTsFile());
 +    long sizeTarget = oldPiece.getDataSize() / 2;
 +
 +    // cannot break a series into two pieces since the chunk order must be preserved
 +    String currMeasurement = null;
 +    List<TsFileData> allTsFileData = oldPiece.getAllTsFileData();
 +    int i = 0;
 +    for (; i < allTsFileData.size(); i++) {
 +      TsFileData tsFileData = allTsFileData.get(i);
 +      if (tsFileData.isModification()) {
 +        // modifications follows previous chunk data
 +        newNodeA.addTsFileData(tsFileData);
 +      } else {
 +        ChunkData chunkData = (ChunkData) tsFileData;
 +        if (currMeasurement == null || currMeasurement.equals(chunkData.firstMeasurement())) {
 +          // the first chunk or chunk of the same series, add it to A
 +          currMeasurement = chunkData.firstMeasurement();
 +          newNodeA.addTsFileData(tsFileData);
 +          sizeTarget -= chunkData.getDataSize();
 +        } else {
 +          // chunk of the next series
 +          if (sizeTarget < 0) {
 +            // splitA is full, break to fill splitB
 +            break;
 +          } else {
 +            // splitA is not full, also add this series to A
 +            currMeasurement = chunkData.firstMeasurement();
 +            newNodeA.addTsFileData(tsFileData);
 +            sizeTarget -= chunkData.getDataSize();
 +          }
 +        }
 +      }
 +    }
 +    // add remaining series to B
 +    for (; i < allTsFileData.size(); i++) {
 +      TsFileData tsFileData = allTsFileData.get(i);
 +      newNodeB.addTsFileData(tsFileData);
 +    }
 +
 +    pieceNodes.add(newNodeA);
 +    if (!newNodeB.getAllTsFileData().isEmpty()) {
 +      pieceNodes.add(newNodeB);
 +      return true;
 +    }
 +    return false;
 +  }
 +
 +  private List<LoadTsFilePieceNode> clusterPieceNode(
 +      Map<String, LoadTsFilePieceNode> measurementPieceNodeMap) {
 +    // convert to feature vector
 +    Map<String, SeriesFeatureVector> measurementVectorMap =
 +        new HashMap<>(measurementPieceNodeMap.size());
 +    for (Entry<String, LoadTsFilePieceNode> entry : measurementPieceNodeMap.entrySet()) {
 +      measurementVectorMap.put(entry.getKey(), convertToFeature(entry.getValue()));
 +    }
 +    // normalize
 +    normalize(measurementVectorMap.values());
 +    Map<String, double[]> doubleVectors = new HashMap<>(measurementPieceNodeMap.size());
 +    for (Entry<String, SeriesFeatureVector> e : measurementVectorMap.entrySet()) {
 +      doubleVectors.put(e.getKey(), e.getValue().numericVector);
 +    }
 +    // clustering
 +    int numCluster = Math.min((int) (doubleVectors.size() / groupFactor), MAX_CLUSTER_NUM);
 +    if (numCluster < 1) {
 +      numCluster = 1;
 +    }
 +    VectorDistance distance = new EuclideanDistance();
 +    Clustering clustering = new KMeans(numCluster, maxIteration);
 +    List<List<String>> clusterResult = clustering.cluster(doubleVectors, distance);
 +    // collect result
 +    List<LoadTsFilePieceNode> clusteredNodes = new ArrayList<>();
 +    for (List<String> cluster : clusterResult) {
 +      if (cluster.isEmpty()) {
 +        continue;
 +      }
 +      LoadTsFilePieceNode pieceNode = measurementPieceNodeMap.get(cluster.get(0));
 +      LoadTsFilePieceNode clusterNode =
 +          new LoadTsFilePieceNode(pieceNode.getPlanNodeId(), pieceNode.getTsFile());
 +      for (String measurementId : cluster) {
 +        pieceNode = measurementPieceNodeMap.get(measurementId);
 +        for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
 +          clusterNode.addTsFileData(tsFileData);
 +        }
 +      }
 +      clusteredNodes.add(clusterNode);
 +    }
 +
 +    return clusteredNodes;
 +  }
 +
 +  private void normalize(Collection<SeriesFeatureVector> vectors) {
 +    String firstMeasurementId = null;
 +    int minDataSize = Integer.MAX_VALUE;
 +    int maxDataSize = Integer.MIN_VALUE;
 +    int minDataType = Integer.MAX_VALUE;
 +    int maxDataType = Integer.MIN_VALUE;
 +    int minCompressionType = Integer.MAX_VALUE;
 +    int maxCompressionType = Integer.MIN_VALUE;
 +    int minEncodingType = Integer.MAX_VALUE;
 +    int maxEncodingType = Integer.MIN_VALUE;
 +    int minNumOfPages = Integer.MAX_VALUE;
 +    int maxNumOfPages = Integer.MIN_VALUE;
 +    String firstDataSample = null;
 +    for (SeriesFeatureVector vector : vectors) {
 +      if (firstMeasurementId == null) {
 +        firstMeasurementId = vector.measurementId;
 +        firstDataSample = vector.dataSample;
 +      }
 +      minDataSize = Math.min(minDataSize, vector.dataSize);
 +      maxDataSize = Math.max(maxDataSize, vector.dataSize);
 +      minDataType = Math.min(minDataType, vector.dataType.ordinal());
 +      maxDataType = Math.max(maxDataType, vector.dataType.ordinal());
 +      minCompressionType = Math.min(minCompressionType, vector.compressionType.ordinal());
 +      maxCompressionType = Math.max(maxCompressionType, vector.compressionType.ordinal());
 +      minEncodingType = Math.min(minEncodingType, vector.encodingType.ordinal());
 +      maxEncodingType = Math.max(maxEncodingType, vector.encodingType.ordinal());
 +      minNumOfPages = Math.min(minNumOfPages, vector.numOfPages);
 +      maxNumOfPages = Math.max(maxNumOfPages, vector.numOfPages);
 +    }
 +
 +    SimilarityStrategy strategy = new LevenshteinDistanceStrategy();
 +    StringSimilarityService service = new StringSimilarityServiceImpl(strategy);
 +    int finalMinDataSize = minDataSize;
 +    String finalFirstMeasurementId = firstMeasurementId;
 +    int finalMaxDataSize = maxDataSize;
 +    int finalMinDataType = minDataType;
 +    int finalMaxDataType = maxDataType;
 +    int finalMinCompressionType = minCompressionType;
 +    int finalMaxCompressionType = maxCompressionType;
 +    int finalMinEncodingType = minEncodingType;
 +    int finalMaxEncodingType = maxEncodingType;
 +    int finalMinNumOfPages = minNumOfPages;
 +    int finalMaxNumOfPages = maxNumOfPages;
 +    String finalFirstDataSample = firstDataSample;
-     vectors.stream().parallel().forEach(vector -> {
-       vector.numericVector[0] = service.score(finalFirstMeasurementId, vector.measurementId);
-       vector.numericVector[1] = (vector.dataSize - finalMinDataSize) * 1.0 / (finalMaxDataSize - finalMinDataSize);
-       vector.numericVector[2] =
-           (vector.dataType.ordinal() - finalMinDataType) * 1.0 / (finalMaxDataType - finalMinDataType);
-       vector.numericVector[3] =
-           (vector.compressionType.ordinal() - finalMinCompressionType)
-               * 1.0
-               / (finalMaxCompressionType - finalMinCompressionType);
-       vector.numericVector[4] =
-           (vector.encodingType.ordinal() - finalMinEncodingType)
-               * 1.0
-               / (finalMaxEncodingType - finalMinEncodingType);
-       vector.numericVector[5] =
-           (vector.numOfPages - finalMinNumOfPages) * 1.0 / (finalMaxNumOfPages - finalMinNumOfPages);
-       vector.numericVector[6] = service.score(finalFirstDataSample, vector.dataSample);
-       double[] numericVector = vector.numericVector;
-       for (int i = 0; i < numericVector.length; i++) {
-         if (Double.isNaN(numericVector[i])) {
-           numericVector[i] = 0.0;
-         } else if (Double.isInfinite(numericVector[i])) {
-           numericVector[i] = 1.0;
-         }
-       }
-     });
++    vectors.stream()
++        .parallel()
++        .forEach(
++            vector -> {
++              vector.numericVector[0] =
++                  service.score(finalFirstMeasurementId, vector.measurementId);
++              vector.numericVector[1] =
++                  (vector.dataSize - finalMinDataSize)
++                      * 1.0
++                      / (finalMaxDataSize - finalMinDataSize);
++              vector.numericVector[2] =
++                  (vector.dataType.ordinal() - finalMinDataType)
++                      * 1.0
++                      / (finalMaxDataType - finalMinDataType);
++              vector.numericVector[3] =
++                  (vector.compressionType.ordinal() - finalMinCompressionType)
++                      * 1.0
++                      / (finalMaxCompressionType - finalMinCompressionType);
++              vector.numericVector[4] =
++                  (vector.encodingType.ordinal() - finalMinEncodingType)
++                      * 1.0
++                      / (finalMaxEncodingType - finalMinEncodingType);
++              vector.numericVector[5] =
++                  (vector.numOfPages - finalMinNumOfPages)
++                      * 1.0
++                      / (finalMaxNumOfPages - finalMinNumOfPages);
++              vector.numericVector[6] = service.score(finalFirstDataSample, vector.dataSample);
++              double[] numericVector = vector.numericVector;
++              for (int i = 0; i < numericVector.length; i++) {
++                if (Double.isNaN(numericVector[i])) {
++                  numericVector[i] = 0.0;
++                } else if (Double.isInfinite(numericVector[i])) {
++                  numericVector[i] = 1.0;
++                }
++              }
++            });
 +  }
 +
 +  private SeriesFeatureVector convertToFeature(LoadTsFilePieceNode pieceNode) {
 +    List<TsFileData> allTsFileData = pieceNode.getAllTsFileData();
 +    SeriesFeatureVector vector = null;
 +    for (TsFileData tsFileData : allTsFileData) {
 +      ChunkData chunkData = (ChunkData) tsFileData;
 +      if (vector == null) {
 +        vector = SeriesFeatureVector.fromChunkData(chunkData, dataSampleLength);
 +      } else {
 +        vector.mergeChunkData(chunkData);
 +      }
 +    }
 +    return vector;
 +  }
 +
 +  public static class SeriesFeatureVector {
 +
 +    private String measurementId;
 +    private int dataSize;
 +    private TSDataType dataType;
 +    private CompressionType compressionType;
 +    private TSEncoding encodingType;
 +    private int numOfPages;
 +    private String dataSample;
 +    private double[] numericVector = new double[7];
 +
 +    public static SeriesFeatureVector fromChunkData(ChunkData data, int dataSampleLength) {
 +      ChunkHeader chunkHeader;
 +      Chunk chunk;
 +      ByteBuffer chunkBuffer;
 +      // sample a buffer from the chunk data
 +      if (data.isAligned()) {
 +        AlignedChunkData alignedChunkData = (AlignedChunkData) data;
 +        chunkHeader = alignedChunkData.getChunkHeaderList().get(0);
 +        if (!alignedChunkData.getChunkList().isEmpty()) {
 +          chunk = alignedChunkData.getChunkList().get(0);
 +          ByteBuffer buffer = chunk.getData();
 +          int sampleLength = Math.min(dataSampleLength, buffer.remaining());
 +          chunkBuffer = buffer.slice();
 +          chunkBuffer.limit(sampleLength);
 +        } else {
 +          chunkBuffer = ByteBuffer.wrap(alignedChunkData.getByteStream().getBuf());
 +          int sampleLength = Math.min(dataSampleLength, chunkBuffer.remaining());
 +          chunkBuffer.limit(sampleLength);
 +        }
 +      } else {
 +        NonAlignedChunkData nonAlignedChunkData = (NonAlignedChunkData) data;
 +        chunkHeader = nonAlignedChunkData.getChunkHeader();
 +        chunk = nonAlignedChunkData.getChunk();
 +        if (chunk != null) {
 +          ByteBuffer buffer = chunk.getData();
 +          int sampleLength = Math.min(dataSampleLength, buffer.remaining());
 +          chunkBuffer = buffer.slice();
 +          chunkBuffer.limit(sampleLength);
 +        } else {
 +          chunkBuffer = ByteBuffer.wrap(nonAlignedChunkData.getByteStream().getBuf());
 +          int sampleLength = Math.min(dataSampleLength, chunkBuffer.remaining());
 +          chunkBuffer.limit(sampleLength);
 +        }
 +      }
 +
 +      SeriesFeatureVector vector = new SeriesFeatureVector();
 +      vector.measurementId = chunkHeader.getMeasurementID();
 +      vector.dataSize = chunkHeader.getDataSize();
 +      vector.dataType = chunkHeader.getDataType();
 +      vector.compressionType = chunkHeader.getCompressionType();
 +      vector.encodingType = chunkHeader.getEncodingType();
 +      vector.numOfPages = chunkHeader.getNumOfPages();
 +      vector.dataSample =
 +          new String(
 +              chunkBuffer.array(),
 +              chunkBuffer.arrayOffset() + chunkBuffer.position(),
 +              chunkBuffer.remaining());
 +      return vector;
 +    }
 +
 +    public void mergeChunkData(ChunkData data) {
 +      ChunkHeader chunkHeader;
 +      if (data.isAligned()) {
 +        AlignedChunkData alignedChunkData = (AlignedChunkData) data;
 +        chunkHeader = alignedChunkData.getChunkHeaderList().get(0);
 +      } else {
 +        NonAlignedChunkData nonAlignedChunkData = (NonAlignedChunkData) data;
 +        chunkHeader = nonAlignedChunkData.getChunkHeader();
 +      }
 +      dataSize += chunkHeader.getDataSize();
 +      numOfPages += chunkHeader.getNumOfPages();
 +    }
 +  }
 +
 +  public interface VectorDistance {
 +
 +    double calculate(double[] v1, double[] v2);
 +  }
 +
 +  public static class EuclideanDistance implements VectorDistance {
 +
 +    @Override
 +    public double calculate(double[] v1, double[] v2) {
 +      double sum = 0;
 +      for (int i = 0; i < v1.length; i++) {
 +        sum += (v1[i] - v2[i]) * (v1[i] - v2[i]);
 +      }
 +      return Math.sqrt(sum) + 1;
 +    }
 +  }
 +
 +  public interface Clustering {
 +
 +    List<List<String>> cluster(Map<String, double[]> tagVectorMap, VectorDistance distance);
 +  }
 +
 +  public class KMeans implements Clustering {
 +
 +    private int k;
 +    private int maxIteration;
 +    private double[][] centroids;
 +    private AtomicInteger[] centroidCounters;
 +    private Random random;
 +    private Map<Entry<String, double[]>, Integer> recordCentroidMapping;
 +    private int vecLength = 0;
 +
 +    public KMeans(int k, int maxIteration) {
 +      this.k = k;
 +      this.maxIteration = maxIteration;
 +      this.centroids = new double[k][];
 +      this.centroidCounters = new AtomicInteger[k];
 +      for (int i = 0; i < centroidCounters.length; i++) {
 +        centroidCounters[i] = new AtomicInteger();
 +      }
 +      this.random = new Random();
 +      this.recordCentroidMapping = new ConcurrentHashMap<>();
 +    }
 +
 +    private void clearCentroidCounter() {
 +      for (AtomicInteger centroidCounter : centroidCounters) {
 +        centroidCounter.set(0);
 +      }
 +    }
 +
 +    @Override
 +    public List<List<String>> cluster(Map<String, double[]> tagVectorMap, VectorDistance distance) {
 +      recordCentroidMapping.clear();
 +      if (k > tagVectorMap.size()) {
 +        k = tagVectorMap.size();
 +        this.centroids = new double[k][];
 +      }
 +
 +      for (Entry<String, double[]> entry : tagVectorMap.entrySet()) {
 +        vecLength = entry.getValue().length;
 +      }
 +
 +      randomCentroid(vecLength, tagVectorMap);
 +
 +      for (int i = 0; i < maxIteration; i++) {
 +        if (!assignCentroid(tagVectorMap, distance)) {
 +          break;
 +        }
 +        newCentroid();
 +        clearCentroidCounter();
 +        if (System.currentTimeMillis() - splitStartTime > splitTimeBudget) {
 +          break;
 +        }
 +      }
 +
 +      Map<Integer, List<Entry<String, double[]>>> centroidRecordMap =
 +          collectCentroidRecordMapping();
 +      return centroidRecordMap.values().stream()
 +          .map(l -> l.stream().map(Entry::getKey).collect(Collectors.toList()))
 +          .collect(Collectors.toList());
 +    }
 +
 +    private Map<Integer, List<Entry<String, double[]>>> collectCentroidRecordMapping() {
 +      Map<Integer, List<Entry<String, double[]>>> centroidRecordMapping = new ConcurrentHashMap<>();
 +      recordCentroidMapping.entrySet().stream()
 +          .parallel()
 +          .forEach(
 +              e ->
 +                  centroidRecordMapping.compute(
 +                      e.getValue(),
 +                      (key, oldV) -> {
 +                        if (oldV == null) {
 +                          oldV = new ArrayList<>();
 +                        }
 +                        oldV.add(e.getKey());
 +                        return oldV;
 +                      }));
 +      return centroidRecordMapping;
 +    }
 +
 +    private void newCentroid() {
 +      Map<Integer, List<Entry<String, double[]>>> centroidRecordMapping =
 +          collectCentroidRecordMapping();
 +      centroidRecordMapping.entrySet().stream()
 +          .parallel()
 +          .forEach(
 +              e -> {
 +                Integer centroidId = e.getKey();
 +                List<Entry<String, double[]>> records = e.getValue();
 +                int recordNum = records.size();
 +                double[] sumVec = new double[vecLength];
 +                for (Entry<String, double[]> record : records) {
 +                  for (int i = 0; i < sumVec.length; i++) {
 +                    sumVec[i] += record.getValue()[i];
 +                  }
 +                }
 +                for (int i = 0; i < sumVec.length; i++) {
 +                  sumVec[i] = sumVec[i] / recordNum;
 +                }
 +                centroids[centroidId] = sumVec;
 +              });
 +    }
 +
 +    private boolean assignCentroid(Map<String, double[]> tagVectorMap, VectorDistance distance) {
 +      AtomicBoolean centroidUpdated = new AtomicBoolean(false);
 +      tagVectorMap.entrySet().stream()
 +          .parallel()
 +          .forEach(
 +              e -> {
 +                double[] vector = e.getValue();
 +                double minDist = Double.MAX_VALUE;
 +                int nearestCentroid = 0;
 +                for (int i = 0; i < centroids.length; i++) {
 +                  double dist =
-                       distance.calculate(vector, centroids[i]) * (1
-                           + centroidCounters[i].get() * 0.1);
++                      distance.calculate(vector, centroids[i])
++                          * (1 + centroidCounters[i].get() * 0.1);
 +                  if (dist < minDist) {
 +                    minDist = dist;
 +                    nearestCentroid = i;
 +                  }
 +                }
 +
 +                centroidCounters[nearestCentroid].incrementAndGet();
 +                int finalNearestCentroid = nearestCentroid;
 +                recordCentroidMapping.put(e, finalNearestCentroid);
 +                recordCentroidMapping.compute(
 +                    e,
 +                    (t, oc) -> {
 +                      if (oc == null || oc != finalNearestCentroid) {
 +                        centroidUpdated.set(true);
 +                        return finalNearestCentroid;
 +                      } else {
 +                        return oc;
 +                      }
 +                    });
 +              });
 +      return centroidUpdated.get();
 +    }
 +
 +    private void randomCentroid(int vecLength, Map<String, double[]> tagVectorMap) {
 +      pickRandomCentroid(tagVectorMap);
 +      // genRandomCentroid(vecLength);
 +    }
 +
 +    private void pickRandomCentroid(Map<String, double[]> tagVectorMap) {
 +      List<double[]> recordVectors = new ArrayList<>(tagVectorMap.values());
 +      Collections.shuffle(recordVectors);
 +      for (int i = 0; i < k; i++) {
 +        centroids[i] = recordVectors.get(i);
 +      }
 +    }
 +
 +    private void genRandomCentroid(int vecLength) {
 +      for (int i = 0; i < k; i++) {
 +        double[] centroid = new double[vecLength];
 +        for (int j = 0; j < vecLength; j++) {
 +          centroid[j] = random.nextDouble();
 +        }
 +        centroids[i] = centroid;
 +      }
 +    }
 +  }
 +}
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 2eb3500eaa8,a1cf6f83749..48423aa5f92
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@@ -186,15 -194,9 +189,16 @@@ public class LoadTsFileScheduler implem
  
    private boolean firstPhase(LoadSingleTsFileNode node) {
      try {
 -      TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node);
 +      TsFileDataManager tsFileDataManager =
 +          new TsFileDataManager(
 +              this::dispatchOnePieceNode,
 +              node.getPlanNodeId(),
 +              node.getTsFileResource().getTsFile(),
 +              partitionFetcher,
-               MAX_MEMORY_SIZE);
++              MAX_MEMORY_SIZE,
++              queryContext.getSession().getUserName());
        new TsFileSplitter(
 -              node.getTsFileResource().getTsFile(), tsFileDataManager::addOrSendTsFileData)
 +              node.getTsFileResource().getTsFile(), tsFileDataManager::addOrSendTsFileData, 0)
            .splitTsFileByDataPartition();
        if (!tsFileDataManager.sendAllTsFileData()) {
          stateMachine.transitionToFailed(new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
diff --cc iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
index 046c52794fb,00000000000..2e2db3dd13e
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
@@@ -1,199 -1,0 +1,205 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.load;
 +
- import static org.junit.Assert.assertEquals;
- 
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.util.Collections;
- import java.util.List;
 +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 +import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 +import org.apache.iotdb.common.rpc.thrift.TSStatus;
 +import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 +import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
++import org.apache.iotdb.commons.utils.TimePartitionUtils;
 +import org.apache.iotdb.db.exception.DataRegionException;
 +import org.apache.iotdb.db.exception.LoadFileException;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 +import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
- import org.apache.iotdb.db.utils.TimePartitionUtils;
 +import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 +import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 +import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 +import org.apache.iotdb.rpc.TSStatusCode;
 +import org.apache.iotdb.tsfile.compress.IUnCompressor;
 +import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 +import org.apache.iotdb.tsfile.read.TsFileReader;
 +import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 +import org.apache.iotdb.tsfile.read.common.Path;
 +import org.apache.iotdb.tsfile.read.common.RowRecord;
 +import org.apache.iotdb.tsfile.read.expression.QueryExpression;
 +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
++
 +import org.apache.thrift.TException;
 +import org.junit.Test;
 +
++import java.io.IOException;
++import java.nio.ByteBuffer;
++import java.util.Collections;
++import java.util.List;
++
++import static org.junit.Assert.assertEquals;
++
 +public class LoadTsFileManagerTest extends TestBase {
 +
 +  private LoadTsFileManager loadTsFileManager = new LoadTsFileManager();
 +  private long maxSplitSize = 128 * 1024 * 1024;
 +
 +  @Override
 +  public void setup() throws IOException, WriteProcessException, DataRegionException {
 +    fileNum = 10;
 +    seriesNum = 100;
 +    deviceNum = 100;
 +    super.setup();
 +  }
 +
 +  @Test
 +  public void test() throws IOException {
 +
 +    LoadTsFileNode loadTsFileNode =
 +        new LoadTsFileNode(new PlanNodeId("testPlanNode"), tsFileResources);
 +    DataPartitionBatchFetcher partitionBatchFetcher = dummyDataPartitionBatchFetcher();
 +    TsFileSplitSender splitSender =
 +        new TsFileSplitSender(
 +            loadTsFileNode,
 +            partitionBatchFetcher,
 +            TimePartitionUtils.getTimePartitionInterval(),
 +            internalServiceClientManager,
 +            false,
 +            maxSplitSize,
-             100);
++            100,
++            "root");
 +    long start = System.currentTimeMillis();
 +    splitSender.start();
 +    long timeConsumption = System.currentTimeMillis() - start;
 +
 +    System.out.printf("Split ends after %dms\n", timeConsumption);
 +
 +    ConsensusGroupId d1GroupId = Factory.create(TConsensusGroupType.DataRegion.getValue(), 0);
 +    DataRegion dataRegion = dataRegionMap.get(d1GroupId.convertToTConsensusGroupId());
 +    List<TsFileResource> tsFileList = dataRegion.getTsFileManager().getTsFileList(false);
 +    // all input files should be shallow-merged into one
 +    System.out.printf("Loaded TsFiles: %s\n", tsFileList);
 +    assertEquals(1, tsFileList.size());
 +
 +    long timePartitionInterval = TimePartitionUtils.getTimePartitionInterval();
 +    long chunkTimeRange = (long) (timePartitionInterval * chunkTimeRangeRatio);
 +    int chunkPointNum = (int) (chunkTimeRange / pointInterval);
 +    long endTime = chunkTimeRange * (fileNum - 1) + pointInterval * (chunkPointNum - 1);
 +    // check device time
 +    TsFileResource tsFileResource = tsFileList.get(0);
 +    for (int i = 0; i < deviceNum; i++) {
 +      assertEquals(0, tsFileResource.getStartTime("d" + i));
 +      assertEquals(endTime, tsFileResource.getEndTime("d" + i));
 +    }
 +
 +    // read and check the generated file
-     try (TsFileReader reader = new TsFileReader(
-         new TsFileSequenceReader(tsFileResource.getTsFile().getPath()))) {
++    try (TsFileReader reader =
++        new TsFileReader(new TsFileSequenceReader(tsFileResource.getTsFile().getPath()))) {
 +      for (int dn = 0; dn < deviceNum; dn++) {
 +        // "Simple_" is generated with linear function and is easy to check
-         QueryExpression queryExpression = QueryExpression.create(
-             Collections.singletonList(new Path("d" + dn, "Simple_22", false)), null);
++        QueryExpression queryExpression =
++            QueryExpression.create(
++                Collections.singletonList(new Path("d" + dn, "Simple_22", false)), null);
 +        QueryDataSet dataSet = reader.query(queryExpression);
 +        int i = 0;
 +        while (dataSet.hasNext()) {
 +          RowRecord record = dataSet.next();
 +          long currTime =
 +              chunkTimeRange * (i / chunkPointNum) + pointInterval * (i % chunkPointNum);
 +          assertEquals(currTime, record.getTimestamp());
 +          assertEquals(1.0 * (i % chunkPointNum), record.getFields().get(0).getDoubleV(), 0.0001);
 +          i++;
 +        }
 +        assertEquals(chunkPointNum * fileNum, i);
 +      }
 +    }
 +  }
 +
 +  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint tEndpoint)
 +      throws TException, IOException {
 +    ConsensusGroupId groupId =
 +        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
 +    ByteBuffer buf = req.body.slice();
 +    if (req.isSetCompressionType()) {
 +      CompressionType compressionType = CompressionType.deserialize(req.compressionType);
 +      IUnCompressor unCompressor = IUnCompressor.getUnCompressor(compressionType);
 +      int uncompressedLength = req.getUncompressedLength();
 +      ByteBuffer allocate = ByteBuffer.allocate(uncompressedLength);
-       unCompressor.uncompress(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining(),
-           allocate.array(), 0);
++      unCompressor.uncompress(
++          buf.array(), buf.arrayOffset() + buf.position(), buf.remaining(), allocate.array(), 0);
 +      allocate.limit(uncompressedLength);
 +      buf = allocate;
 +    }
 +
 +    LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) PlanNodeType.deserialize(buf);
-     loadTsFileManager.writeToDataRegion(dataRegionMap.get(req.consensusGroupId), pieceNode,
-         req.uuid);
++    loadTsFileManager.writeToDataRegion(
++        dataRegionMap.get(req.consensusGroupId), pieceNode, req.uuid);
 +
 +    // forward to other replicas in the group
 +    if (req.isRelay) {
 +      req.isRelay = false;
 +      TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
-       regionReplicaSet.getDataNodeLocations().stream().parallel().forEach(dataNodeLocation -> {
-         TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
-         if (!otherPoint.equals(tEndpoint)) {
-           try {
-             handleTsFilePieceNode(req, otherPoint);
-           } catch (TException | IOException e) {
-             throw new RuntimeException(e);
-           }
-         }
-       });
++      regionReplicaSet.getDataNodeLocations().stream()
++          .parallel()
++          .forEach(
++              dataNodeLocation -> {
++                TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
++                if (!otherPoint.equals(tEndpoint)) {
++                  try {
++                    handleTsFilePieceNode(req, otherPoint);
++                  } catch (TException | IOException e) {
++                    throw new RuntimeException(e);
++                  }
++                }
++              });
 +    }
 +
 +    return new TLoadResp()
 +        .setAccepted(true)
 +        .setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
 +  }
 +
 +  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint tEndpoint)
 +      throws LoadFileException, IOException {
 +    ConsensusGroupId groupId =
 +        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
 +
 +    // forward to other replicas in the group
 +    if (req.useConsensus) {
 +      req.useConsensus = false;
 +      TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
 +      for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
 +        TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
 +        if (!otherPoint.equals(tEndpoint)) {
 +          handleTsLoadCommand(req, otherPoint);
 +        }
 +      }
 +    }
 +
 +    loadTsFileManager.loadAll(req.uuid, false);
 +
 +    return new TLoadResp()
 +        .setAccepted(true)
 +        .setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
 +  }
- 
 +}
diff --cc iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
index 71efb778a47,00000000000..c675a9f02f8
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
@@@ -1,77 -1,0 +1,77 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.load;
 +
 +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 +import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
- import org.apache.iotdb.db.utils.TimePartitionUtils;
++import org.apache.iotdb.commons.utils.TimePartitionUtils;
 +
 +import org.junit.Test;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.TimeUnit;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +public class MergedTsFileSplitterTest extends TestBase {
 +
 +  private List<TsFileData> resultSet = new ArrayList<>();
 +
 +  @Test
 +  public void testSplit() throws IOException {
 +    long start = System.currentTimeMillis();
 +    MergedTsFileSplitter splitter =
 +        new MergedTsFileSplitter(
 +            files,
 +            this::consumeSplit,
 +            IoTDBThreadPoolFactory.newThreadPool(
 +                16,
 +                Integer.MAX_VALUE,
 +                20,
 +                TimeUnit.SECONDS,
 +                new SynchronousQueue<>(),
 +                new IoTThreadFactory("MergedTsFileSplitter"),
 +                "MergedTsFileSplitter"),
 +            TimePartitionUtils.getTimePartitionInterval(),
 +            fileNum);
 +    try {
 +      splitter.splitTsFileByDataPartition();
 +      for (TsFileData tsFileData : resultSet) {
 +        //         System.out.println(tsFileData);
 +      }
 +    } finally {
 +      splitter.close();
 +    }
 +    System.out.printf(
 +        "%d splits after %dms\n", resultSet.size(), System.currentTimeMillis() - start);
 +    assertEquals(resultSet.size(), expectedChunkNum());
 +  }
 +
 +  public boolean consumeSplit(TsFileData data) {
 +    resultSet.add(data);
 +    if (resultSet.size() % 1000 == 0) {
 +      System.out.printf("%d chunks split\n", resultSet.size());
 +    }
 +    return true;
 +  }
 +}
diff --cc iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
index a8e7ee3b2bf,00000000000..de0bf3c9590
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
@@@ -1,438 -1,0 +1,431 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.load;
 +
- import static org.apache.iotdb.commons.conf.IoTDBConstant.GB;
- 
- import java.util.Comparator;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.atomic.AtomicLong;
 +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 +import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 +import org.apache.iotdb.common.rpc.thrift.TSStatus;
 +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 +import org.apache.iotdb.commons.client.ClientManager;
 +import org.apache.iotdb.commons.client.ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory;
 +import org.apache.iotdb.commons.client.IClientManager;
 +import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 +import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
++import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
 +import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 +import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
 +import org.apache.iotdb.commons.partition.DataPartition;
 +import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 +import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
 +import org.apache.iotdb.commons.partition.SchemaPartition;
 +import org.apache.iotdb.commons.path.PathPatternTree;
 +import org.apache.iotdb.commons.utils.FileUtils;
++import org.apache.iotdb.commons.utils.TimePartitionUtils;
 +import org.apache.iotdb.db.exception.DataRegionException;
 +import org.apache.iotdb.db.exception.LoadFileException;
 +import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 +import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
- import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
 +import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy.DirectFlushPolicy;
 +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 +import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
 +import org.apache.iotdb.db.utils.SequenceUtils.DoubleSequenceGenerator;
 +import org.apache.iotdb.db.utils.SequenceUtils.DoubleSequenceGeneratorFactory;
 +import org.apache.iotdb.db.utils.SequenceUtils.GaussianDoubleSequenceGenerator;
 +import org.apache.iotdb.db.utils.SequenceUtils.SimpleDoubleSequenceGenerator;
 +import org.apache.iotdb.db.utils.SequenceUtils.UniformDoubleSequenceGenerator;
- import org.apache.iotdb.db.utils.TimePartitionUtils;
 +import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 +import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 +import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 +import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 +import org.apache.iotdb.rpc.TSStatusCode;
 +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 +import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 +import org.apache.iotdb.tsfile.read.common.Path;
- import org.apache.iotdb.tsfile.utils.BytesUtils;
 +import org.apache.iotdb.tsfile.utils.Pair;
 +import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
 +import org.apache.iotdb.tsfile.write.TsFileWriter;
 +import org.apache.iotdb.tsfile.write.record.Tablet;
 +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 +
 +import org.apache.thrift.TException;
 +import org.apache.thrift.protocol.TBinaryProtocol;
 +import org.apache.thrift.protocol.TProtocol;
 +import org.apache.thrift.transport.TByteBuffer;
 +import org.apache.thrift.transport.TTransportException;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Arrays;
++import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
++import static org.apache.iotdb.commons.conf.IoTDBConstant.GB;
++
 +public class TestBase {
 +
 +  private static final Logger logger = LoggerFactory.getLogger(TestBase.class);
 +  public static final String BASE_OUTPUT_PATH = "target".concat(File.separator).concat("loadTest");
 +  public static final String PARTIAL_PATH_STRING =
 +      "%s" + File.separator + "%d" + File.separator + "%d" + File.separator;
 +  public static final String TEST_TSFILE_PATH =
 +      BASE_OUTPUT_PATH + File.separator + "testTsFile".concat(File.separator) + PARTIAL_PATH_STRING;
 +
 +  protected int fileNum = 100;
 +  // series number of each file, sn non-aligned series and 1 aligned series with sn measurements
 +  protected int seriesNum = 1000;
 +  protected int deviceNum = 100;
 +  // number of chunks of each series in a file, each series has only one chunk in a file
 +  protected double chunkTimeRangeRatio = 0.001;
 +  // the interval between two consecutive points of a series
 +  protected long pointInterval = 50_000;
-   protected List<Pair<String, DoubleSequenceGeneratorFactory>> measurementSequenceGeneratorPairs = Arrays.asList(
-       new Pair<>("Simple_", new SimpleDoubleSequenceGenerator.Factory()),
-       new Pair<>("UniformA_", new UniformDoubleSequenceGenerator.Factory(1.0)),
-       new Pair<>("GaussianA_", new GaussianDoubleSequenceGenerator.Factory(1.0, 1.0)),
-       new Pair<>("UniformB_", new UniformDoubleSequenceGenerator.Factory(10.0)),
-       new Pair<>("GaussianB_", new GaussianDoubleSequenceGenerator.Factory(15.0, 3.0))
-   );
++  protected List<Pair<String, DoubleSequenceGeneratorFactory>> measurementSequenceGeneratorPairs =
++      Arrays.asList(
++          new Pair<>("Simple_", new SimpleDoubleSequenceGenerator.Factory()),
++          new Pair<>("UniformA_", new UniformDoubleSequenceGenerator.Factory(1.0)),
++          new Pair<>("GaussianA_", new GaussianDoubleSequenceGenerator.Factory(1.0, 1.0)),
++          new Pair<>("UniformB_", new UniformDoubleSequenceGenerator.Factory(10.0)),
++          new Pair<>("GaussianB_", new GaussianDoubleSequenceGenerator.Factory(15.0, 3.0)));
 +  protected final List<File> files = new ArrayList<>();
 +  protected final List<TsFileResource> tsFileResources = new ArrayList<>();
 +  protected IPartitionFetcher partitionFetcher;
 +  // the key is deviceId, not partitioned by time in the simple test
 +  protected Map<String, TRegionReplicaSet> partitionTable = new HashMap<>();
 +  protected Map<TConsensusGroupId, DataRegion> dataRegionMap = new HashMap<>();
 +  protected Map<ConsensusGroupId, TRegionReplicaSet> groupId2ReplicaSetMap = new HashMap<>();
 +  protected IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
 +      internalServiceClientManager;
 +  // the third key is UUid, the forth key is targetFile
 +
 +  private int groupSizeInByte;
 +
 +  @Before
 +  public void setup() throws IOException, WriteProcessException, DataRegionException {
 +    setupFiles();
 +    logger.info("{} files set up", files.size());
 +    partitionFetcher = dummyPartitionFetcher();
 +    setupPartitionTable();
 +    setupClientManager();
 +    groupSizeInByte = TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte();
 +    TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte((int) (GB));
 +  }
 +
 +  @After
 +  public void cleanup() {
 +    FileUtils.deleteDirectory(new File(BASE_OUTPUT_PATH));
 +    FileUtils.deleteDirectory(new File("target" + File.separator + "data"));
 +    TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(groupSizeInByte);
 +  }
 +
 +  public int expectedChunkNum() {
 +    double totalTimeRange = chunkTimeRangeRatio * fileNum;
 +    int splitChunkNum = 0;
 +    // if the boundary of the ith partition does not overlap a chunk, it introduces an additional
 +    // split
 +    // TODO: due to machine precision, the calculation may have error. Also, if the data amount is
 +    // too large, there could be more than one chunk for each series in the file.
 +    for (int i = 0; i <= totalTimeRange; i++) {
 +      if (i * 1.0 % chunkTimeRangeRatio > 0.00001) {
 +        splitChunkNum += 1;
 +      }
 +    }
 +    return (splitChunkNum + fileNum) * seriesNum * 2;
 +  }
 +
 +  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint tEndpoint)
 +      throws TException, IOException {
 +    return new TLoadResp()
 +        .setAccepted(true)
 +        .setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
 +  }
 +
 +  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint tEndpoint)
 +      throws LoadFileException, IOException {
 +    return new TLoadResp()
 +        .setAccepted(true)
 +        .setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
 +  }
 +
 +  public TProtocol dummyProtocol() throws TTransportException {
 +    return new TBinaryProtocol(new TByteBuffer(ByteBuffer.allocate(0)));
 +  }
 +
 +  public void setupClientManager() {
 +    SyncDataNodeInternalServiceClientPoolFactory poolFactory =
 +        new SyncDataNodeInternalServiceClientPoolFactory();
 +    internalServiceClientManager =
 +        new ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>(poolFactory) {
 +          @Override
 +          public SyncDataNodeInternalServiceClient borrowClient(TEndPoint node) {
 +            try {
 +              return new SyncDataNodeInternalServiceClient(
 +                  dummyProtocol(), new ThriftClientProperty.Builder().build(), node, this) {
 +                @Override
 +                public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req) throws TException {
 +                  try {
 +                    return handleTsFilePieceNode(req, getTEndpoint());
 +                  } catch (IOException e) {
 +                    throw new TException(e);
 +                  }
 +                }
 +
 +                @Override
 +                public TLoadResp sendLoadCommand(TLoadCommandReq req) throws TException {
 +                  try {
 +                    return handleTsLoadCommand(req, getTEndpoint());
 +                  } catch (LoadFileException | IOException e) {
 +                    throw new TException(e);
 +                  }
 +                }
 +
 +                @Override
-                 public void close() {
-                 }
++                public void close() {}
 +              };
 +            } catch (TTransportException e) {
 +              throw new RuntimeException(e);
 +            }
 +          }
 +
 +          @Override
-           public void clear(TEndPoint node) {
-           }
++          public void clear(TEndPoint node) {}
 +
 +          @Override
-           public void close() {
-           }
++          public void close() {}
 +        };
 +  }
 +
 +  public void setupPartitionTable() throws DataRegionException {
 +    ConsensusGroupId d1GroupId = Factory.create(TConsensusGroupType.DataRegion.getValue(), 0);
 +    TRegionReplicaSet d1Replicas =
 +        new TRegionReplicaSet(
 +            d1GroupId.convertToTConsensusGroupId(),
 +            Arrays.asList(
 +                new TDataNodeLocation()
 +                    .setDataNodeId(0)
 +                    .setInternalEndPoint(new TEndPoint("localhost", 10000)),
 +                new TDataNodeLocation()
 +                    .setDataNodeId(1)
 +                    .setInternalEndPoint(new TEndPoint("localhost", 10001)),
 +                new TDataNodeLocation()
 +                    .setDataNodeId(2)
 +                    .setInternalEndPoint(new TEndPoint("localhost", 10002))));
 +
-     WALRecoverManager.getInstance()
-         .setAllDataRegionScannedLatch(new CountDownLatch(0));
-     DataRegion dataRegion = new DataRegion(BASE_OUTPUT_PATH, d1GroupId.toString(),
-         new DirectFlushPolicy(), "root.loadTest");
++    WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new ExceptionalCountDownLatch(0));
++    DataRegion dataRegion =
++        new DataRegion(
++            BASE_OUTPUT_PATH, d1GroupId.toString(), new DirectFlushPolicy(), "root.loadTest");
 +    for (int i = 0; i < deviceNum; i++) {
 +      partitionTable.put("d" + i, d1Replicas);
 +      dataRegionMap.put(d1GroupId.convertToTConsensusGroupId(), dataRegion);
 +    }
 +
 +    groupId2ReplicaSetMap.put(d1GroupId, d1Replicas);
 +    ConsensusGroupId d2GroupId = Factory.create(TConsensusGroupType.DataRegion.getValue(), 1);
 +    TRegionReplicaSet d2Replicas =
 +        new TRegionReplicaSet(
 +            d2GroupId.convertToTConsensusGroupId(),
 +            Arrays.asList(
 +                new TDataNodeLocation()
 +                    .setDataNodeId(3)
 +                    .setInternalEndPoint(new TEndPoint("localhost", 10003)),
 +                new TDataNodeLocation()
 +                    .setDataNodeId(4)
 +                    .setInternalEndPoint(new TEndPoint("localhost", 10004)),
 +                new TDataNodeLocation()
 +                    .setDataNodeId(5)
 +                    .setInternalEndPoint(new TEndPoint("localhost", 10005))));
 +    partitionTable.put("dd1", d2Replicas);
 +    groupId2ReplicaSetMap.put(d2GroupId, d2Replicas);
 +  }
 +
 +  public IPartitionFetcher dummyPartitionFetcher() {
 +    return new IPartitionFetcher() {
 +      @Override
 +      public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
 +        return null;
 +      }
 +
 +      @Override
-       public SchemaPartition getOrCreateSchemaPartition(PathPatternTree patternTree) {
++      public SchemaPartition getOrCreateSchemaPartition(
++          PathPatternTree patternTree, String userName) {
 +        return null;
 +      }
 +
 +      @Override
 +      public DataPartition getDataPartition(
 +          Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
 +        return null;
 +      }
 +
 +      @Override
 +      public DataPartition getDataPartitionWithUnclosedTimeRange(
 +          Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
 +        return null;
 +      }
 +
 +      @Override
 +      public DataPartition getOrCreateDataPartition(
 +          Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
 +        return null;
 +      }
 +
 +      @Override
 +      public DataPartition getOrCreateDataPartition(
-           List<DataPartitionQueryParam> dataPartitionQueryParams) {
++          List<DataPartitionQueryParam> dataPartitionQueryParams, String userName) {
 +        return null;
 +      }
 +
 +      @Override
 +      public SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
-           PathPatternTree patternTree, Integer level) {
++          PathPatternTree patternTree, PathPatternTree scope, Integer level) {
 +        return null;
 +      }
 +
 +      @Override
 +      public boolean updateRegionCache(TRegionRouteReq req) {
 +        return false;
 +      }
 +
 +      @Override
-       public void invalidAllCache() {
-       }
++      public void invalidAllCache() {}
 +    };
 +  }
 +
 +  public DataPartitionBatchFetcher dummyDataPartitionBatchFetcher() {
 +    return new DataPartitionBatchFetcher(partitionFetcher) {
 +      @Override
 +      public List<TRegionReplicaSet> queryDataPartition(
-           List<Pair<String, TTimePartitionSlot>> slotList) {
++          List<Pair<String, TTimePartitionSlot>> slotList, String userName) {
 +        return slotList.stream().map(p -> partitionTable.get(p.left)).collect(Collectors.toList());
 +      }
 +    };
 +  }
 +
 +  public void setupFiles() {
 +    measurementSequenceGeneratorPairs.sort(Comparator.comparing(Pair::getLeft));
-     List<Pair<MeasurementSchema, DoubleSequenceGeneratorFactory>> schemaGeneratorPairs = new ArrayList<>();
++    List<Pair<MeasurementSchema, DoubleSequenceGeneratorFactory>> schemaGeneratorPairs =
++        new ArrayList<>();
 +    for (int sn = 0; sn < seriesNum; sn++) {
-       Pair<String, DoubleSequenceGeneratorFactory> measurementGeneratorPair = measurementSequenceGeneratorPairs.get(
-           sn % measurementSequenceGeneratorPairs.size());
++      Pair<String, DoubleSequenceGeneratorFactory> measurementGeneratorPair =
++          measurementSequenceGeneratorPairs.get(sn % measurementSequenceGeneratorPairs.size());
 +      MeasurementSchema measurementSchema =
-           new MeasurementSchema(measurementGeneratorPair
-               .left + sn,
-               TSDataType.DOUBLE);
++          new MeasurementSchema(measurementGeneratorPair.left + sn, TSDataType.DOUBLE);
 +      measurementSchema.setCompressor(CompressionType.LZ4.serialize());
 +      measurementSchema.setEncoding(TSEncoding.PLAIN.serialize());
 +      schemaGeneratorPairs.add(new Pair<>(measurementSchema, measurementGeneratorPair.right));
 +    }
 +    schemaGeneratorPairs.sort(Comparator.comparing(s -> s.left.getMeasurementId()));
-     List<MeasurementSchema> measurementSchemas = schemaGeneratorPairs.stream().map(m -> m.left)
-         .collect(
-             Collectors.toList());
++    List<MeasurementSchema> measurementSchemas =
++        schemaGeneratorPairs.stream().map(m -> m.left).collect(Collectors.toList());
 +    IntStream.range(0, fileNum)
 +        .parallel()
 +        .forEach(
 +            i -> {
 +              try {
 +                File file = new File(getTestTsFilePath("root.sg1", 0, 0, i));
 +                TsFileResource tsFileResource = new TsFileResource(file);
 +                synchronized (files) {
 +                  files.add(file);
 +                }
 +
 +                try (TsFileWriter writer = new TsFileWriter(file)) {
 +                  // sn non-aligned series under d1 and 1 aligned series with sn measurements under
 +                  // dd2
 +                  for (int sn = 0; sn < seriesNum; sn++) {
 +                    for (int dn = 0; dn < deviceNum; dn++) {
-                       writer.registerTimeseries(new Path("d" + dn),
-                           schemaGeneratorPairs.get(sn).left);
++                      writer.registerTimeseries(
++                          new Path("d" + dn), schemaGeneratorPairs.get(sn).left);
 +                    }
 +                  }
 +                  writer.registerAlignedTimeseries(new Path("dd1"), measurementSchemas);
 +
 +                  // one chunk for each series
 +                  long timePartitionInterval = TimePartitionUtils.getTimePartitionInterval();
 +                  long chunkTimeRange = (long) (timePartitionInterval * chunkTimeRangeRatio);
 +                  int chunkPointNum = (int) (chunkTimeRange / pointInterval);
 +
 +                  Tablet tablet = new Tablet("d0", measurementSchemas, chunkPointNum);
 +                  for (int sn = 0; sn < seriesNum; sn++) {
 +                    DoubleSequenceGenerator sequenceGenerator =
-                         schemaGeneratorPairs.get(sn).right
-                             .create();
++                        schemaGeneratorPairs.get(sn).right.create();
 +                    for (int pn = 0; pn < chunkPointNum; pn++) {
 +                      if (sn == 0) {
 +                        long currTime = chunkTimeRange * i + pointInterval * pn;
 +                        tablet.addTimestamp(pn, currTime);
 +                      }
-                       tablet.addValue(schemaGeneratorPairs.get(sn).left.getMeasurementId(), pn,
++                      tablet.addValue(
++                          schemaGeneratorPairs.get(sn).left.getMeasurementId(),
++                          pn,
 +                          sequenceGenerator.gen(pn));
 +                    }
 +                  }
 +
 +                  tablet.rowSize = chunkPointNum;
 +                  for (int dn = 0; dn < deviceNum; dn++) {
 +                    tablet.deviceId = "d" + dn;
 +                    writer.write(tablet);
 +                  }
 +                  tablet.deviceId = "d2";
 +                  // writer.writeAligned(tablet);
 +
 +                  writer.flushAllChunkGroups();
 +                  for (int dn = 0; dn < deviceNum; dn++) {
 +                    tsFileResource.updateStartTime("d" + dn, chunkTimeRange * i);
 +                    tsFileResource.updateEndTime("d" + dn, chunkTimeRange * (i + 1));
 +                  }
 +                }
 +
 +                tsFileResource.close();
 +                synchronized (tsFileResources) {
 +                  tsFileResources.add(tsFileResource);
 +                }
 +              } catch (IOException | WriteProcessException e) {
 +                throw new RuntimeException(e);
 +              }
 +            });
 +    tsFileResources.sort(Comparator.comparingLong(TsFileResource::getVersion));
 +  }
 +
 +  public static String getTestTsFilePath(
 +      String logicalStorageGroupName,
 +      long VirtualStorageGroupId,
 +      long TimePartitionId,
 +      long tsFileVersion) {
 +    String filePath =
 +        String.format(
 +            TEST_TSFILE_PATH, logicalStorageGroupName, VirtualStorageGroupId, TimePartitionId);
 +    return TsFileGeneratorUtils.getTsFilePath(filePath, tsFileVersion);
 +  }
 +}
diff --cc iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
index a1509216852,00000000000..3f605a36247
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@@ -1,329 -1,0 +1,337 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.load;
 +
- import java.util.concurrent.atomic.AtomicLong;
 +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 +import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 +import org.apache.iotdb.common.rpc.thrift.TSStatus;
 +import org.apache.iotdb.commons.consensus.ConsensusGroupId;
++import org.apache.iotdb.commons.utils.TimePartitionUtils;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 +import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
- import org.apache.iotdb.db.utils.TimePartitionUtils;
 +import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 +import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 +import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 +import org.apache.iotdb.rpc.TSStatusCode;
 +import org.apache.iotdb.tsfile.compress.IUnCompressor;
 +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 +import org.apache.iotdb.tsfile.utils.Pair;
 +
 +import org.apache.thrift.TException;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Comparator;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentSkipListMap;
 +import java.util.concurrent.ConcurrentSkipListSet;
++import java.util.concurrent.atomic.AtomicLong;
 +import java.util.stream.Collectors;
 +
 +import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
 +import static org.junit.Assert.assertEquals;
 +
 +public class TsFileSplitSenderTest extends TestBase {
 +
 +  private static final Logger logger = LoggerFactory.getLogger(TsFileSplitSenderTest.class);
 +  protected Map<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>>>
 +      phaseOneResults = new ConcurrentSkipListMap<>();
 +  // the third key is UUid, the value is command type
 +  protected Map<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>> phaseTwoResults =
 +      new ConcurrentSkipListMap<>();
 +  // simulating network delay and packet loss
-   private long dummyDelayMS = 000;
++  private long dummyDelayMS = 0;
 +  private double packetLossRatio = 0.00;
 +  private Random random = new Random();
 +  private long maxSplitSize = 128 * 1024 * 1024;
 +  // simulating jvm stall like GC
 +  private long minStuckIntervalMS = 50000;
 +  private long maxStuckIntervalMS = 100000;
 +  private long stuckDurationMS = 0;
 +
 +  private long nodeThroughput = 10_000;
 +
 +  protected Map<TEndPoint, Pair<Long, Long>> nextStuckTimeMap = new ConcurrentHashMap<>();
 +  private AtomicLong sumHandleTime = new AtomicLong();
 +  private AtomicLong decompressTime = new AtomicLong();
 +  private AtomicLong deserializeTime = new AtomicLong();
 +  private AtomicLong relayTime = new AtomicLong();
 +  private AtomicLong maxMemoryUsage = new AtomicLong();
 +
 +  @Test
 +  public void test() throws IOException {
-     Thread thread = new Thread(() -> {
-       while (!Thread.interrupted()) {
-         long preUsage = maxMemoryUsage.get();
-         long newUsage = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
-         if (preUsage < newUsage) {
-           maxMemoryUsage.set(newUsage);
-         }
-         try {
-           Thread.sleep(100);
-         } catch (InterruptedException e) {
-           return;
-         }
-       }
-     });
++    Thread thread =
++        new Thread(
++            () -> {
++              while (!Thread.interrupted()) {
++                long preUsage = maxMemoryUsage.get();
++                long newUsage =
++                    Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
++                if (preUsage < newUsage) {
++                  maxMemoryUsage.set(newUsage);
++                }
++                try {
++                  Thread.sleep(100);
++                } catch (InterruptedException e) {
++                  return;
++                }
++              }
++            });
 +    thread.start();
 +
 +    LoadTsFileNode loadTsFileNode =
 +        new LoadTsFileNode(new PlanNodeId("testPlanNode"), tsFileResources);
 +    DataPartitionBatchFetcher partitionBatchFetcher = dummyDataPartitionBatchFetcher();
 +    TsFileSplitSender splitSender =
 +        new TsFileSplitSender(
 +            loadTsFileNode,
 +            partitionBatchFetcher,
 +            TimePartitionUtils.getTimePartitionInterval(),
 +            internalServiceClientManager,
 +            false,
 +            maxSplitSize,
-             100);
++            100,
++            "root");
 +    long start = System.currentTimeMillis();
 +    splitSender.start();
 +    long timeConsumption = System.currentTimeMillis() - start;
 +    thread.interrupt();
 +
 +    printPhaseResult();
 +    long transmissionTime = splitSender.getStatistic().compressedSize.get() / nodeThroughput;
-     System.out.printf("Split ends after %dms + %dms (Transmission) = %dms\n", timeConsumption,
-         transmissionTime, timeConsumption + transmissionTime);
++    System.out.printf(
++        "Split ends after %dms + %dms (Transmission) = %dms\n",
++        timeConsumption, transmissionTime, timeConsumption + transmissionTime);
 +    System.out.printf("Handle sum %dns\n", sumHandleTime.get());
 +    System.out.printf("Decompress sum %dns\n", decompressTime.get());
 +    System.out.printf("Deserialize sum %dns\n", deserializeTime.get());
 +    System.out.printf("Relay sum %dns\n", relayTime.get());
 +    System.out.printf("Memory usage %dMB\n", maxMemoryUsage.get() / MB);
 +  }
 +
 +  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint tEndpoint)
 +      throws TException, IOException {
 +    long handleStart = System.nanoTime();
 +    if ((tEndpoint.getPort() - 10000) % 3 == 0
 +        && random.nextDouble() < packetLossRatio
 +        && req.isRelay) {
 +      throw new TException("Packet lost");
 +    }
 +    if ((tEndpoint.getPort() - 10000) % 3 == 1
 +        && random.nextDouble() < packetLossRatio / 2
 +        && req.isRelay) {
 +      throw new TException("Packet lost");
 +    }
 +
 +    if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay && stuckDurationMS > 0) {
 +      Pair<Long, Long> nextStuckTime =
 +          nextStuckTimeMap.computeIfAbsent(
 +              tEndpoint,
 +              e ->
 +                  new Pair<>(
 +                      System.currentTimeMillis(), System.currentTimeMillis() + stuckDurationMS));
 +      long currTime = System.currentTimeMillis();
 +      if (currTime >= nextStuckTime.left && currTime < nextStuckTime.right) {
 +        logger.debug("Node{} stalls", tEndpoint.getPort() - 10000);
 +        try {
 +          Thread.sleep(nextStuckTime.right - currTime);
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException(e);
 +        }
 +      } else if (currTime > nextStuckTime.right) {
 +        nextStuckTimeMap.compute(
 +            tEndpoint,
 +            (endPoint, newInterval) -> {
 +              if (newInterval != null && currTime < newInterval.right) {
 +                return newInterval;
 +              }
 +              long start =
 +                  currTime
 +                      + minStuckIntervalMS
 +                      + random.nextInt((int) (maxStuckIntervalMS - minStuckIntervalMS));
 +              return new Pair<>(start, start + stuckDurationMS);
 +            });
 +      }
 +    }
 +
 +    long decompressStart = System.nanoTime();
 +    ConsensusGroupId groupId =
 +        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
 +    ByteBuffer buf = req.body.slice();
 +    if (req.isSetCompressionType()) {
 +      CompressionType compressionType = CompressionType.deserialize(req.compressionType);
 +      IUnCompressor unCompressor = IUnCompressor.getUnCompressor(compressionType);
 +      int uncompressedLength = req.getUncompressedLength();
 +      ByteBuffer allocate = ByteBuffer.allocate(uncompressedLength);
-       unCompressor.uncompress(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining(),
-           allocate.array(), 0);
++      unCompressor.uncompress(
++          buf.array(), buf.arrayOffset() + buf.position(), buf.remaining(), allocate.array(), 0);
 +      allocate.limit(uncompressedLength);
 +      buf = allocate;
 +    }
 +    decompressTime.addAndGet(System.nanoTime() - decompressStart);
 +
 +    long deserializeStart = System.nanoTime();
 +    LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) PlanNodeType.deserialize(buf);
 +    deserializeTime.addAndGet(System.nanoTime() - deserializeStart);
 +    Set<Integer> splitIds =
 +        phaseOneResults
 +            .computeIfAbsent(
 +                tEndpoint,
 +                e -> new ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
 +            .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
 +            .computeIfAbsent(req.uuid, id -> new ConcurrentSkipListMap<>())
 +            .computeIfAbsent(pieceNode.getTsFile(), f -> new ConcurrentSkipListSet<>());
 +    splitIds.addAll(
 +        pieceNode.getAllTsFileData().stream()
 +            .map(TsFileData::getSplitId)
 +            .collect(Collectors.toList()));
 +
 +    if (dummyDelayMS > 0) {
 +      if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay) {
 +        try {
 +          Thread.sleep(dummyDelayMS);
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException(e);
 +        }
 +      }
 +      if ((tEndpoint.getPort() - 10000) % 3 == 1 && req.isRelay) {
 +        try {
 +          Thread.sleep(dummyDelayMS / 2);
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException(e);
 +        }
 +      }
 +    }
 +
 +    // forward to other replicas in the group
 +    if (req.isRelay) {
 +      long relayStart = System.nanoTime();
 +      req.isRelay = false;
 +      TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
-       regionReplicaSet.getDataNodeLocations().stream().parallel().forEach(dataNodeLocation -> {
-         TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
-         if (!otherPoint.equals(tEndpoint)) {
-           try {
-             handleTsFilePieceNode(req, otherPoint);
-           } catch (TException | IOException e) {
-             throw new RuntimeException(e);
-           }
-         }
-       });
++      regionReplicaSet.getDataNodeLocations().stream()
++          .parallel()
++          .forEach(
++              dataNodeLocation -> {
++                TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
++                if (!otherPoint.equals(tEndpoint)) {
++                  try {
++                    handleTsFilePieceNode(req, otherPoint);
++                  } catch (TException | IOException e) {
++                    throw new RuntimeException(e);
++                  }
++                }
++              });
 +      relayTime.addAndGet(System.nanoTime() - relayStart);
 +    }
 +
 +    sumHandleTime.addAndGet(System.nanoTime() - handleStart);
 +    return new TLoadResp()
 +        .setAccepted(true)
 +        .setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
 +  }
 +
 +  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint tEndpoint) {
 +    ConsensusGroupId groupId =
 +        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
 +    phaseTwoResults
 +        .computeIfAbsent(
 +            tEndpoint,
 +            e -> new ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
 +        .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
 +        .computeIfAbsent(req.uuid, id -> req.commandType);
 +
 +    // forward to other replicas in the group
 +    if (req.useConsensus) {
 +      req.useConsensus = false;
 +      TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
 +      for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
 +        TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
 +        if (!otherPoint.equals(tEndpoint)) {
 +          handleTsLoadCommand(req, otherPoint);
 +        }
 +      }
 +    }
 +
 +    return new TLoadResp()
 +        .setAccepted(true)
 +        .setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
 +  }
 +
 +  public void printPhaseResult() {
 +    System.out.print("Phase one:\n");
 +    for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>>>
 +        tEndPointMapEntry : phaseOneResults.entrySet()) {
 +      TEndPoint endPoint = tEndPointMapEntry.getKey();
 +      for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>> consensusGroupIdMapEntry :
 +          tEndPointMapEntry.getValue().entrySet()) {
 +        ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
 +        for (Entry<String, Map<File, Set<Integer>>> stringMapEntry :
 +            consensusGroupIdMapEntry.getValue().entrySet()) {
 +          String uuid = stringMapEntry.getKey();
 +          for (Entry<File, Set<Integer>> fileListEntry : stringMapEntry.getValue().entrySet()) {
 +            File tsFile = fileListEntry.getKey();
 +            Set<Integer> chunks = fileListEntry.getValue();
 +            System.out.printf(
 +                "%s - %s - %s - %s - %s chunks\n",
 +                endPoint, consensusGroupId, uuid, tsFile, chunks.size());
 +            //            if (consensusGroupId.getId() == 0) {
 +            //              // d1, non-aligned series
 +            //              assertEquals(expectedChunkNum() / 2, chunks.size());
 +            //            } else {
 +            //              // d2, aligned series
 +            //              assertEquals(expectedChunkNum() / 2 / seriesNum, chunks.size());
 +            //            }
 +          }
 +        }
 +      }
 +    }
 +
 +    System.out.print("Phase two:\n");
 +    for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>> tEndPointMapEntry :
 +        phaseTwoResults.entrySet()) {
 +      TEndPoint endPoint = tEndPointMapEntry.getKey();
 +      for (Entry<ConsensusGroupId, Map<String, Integer>> consensusGroupIdMapEntry :
 +          tEndPointMapEntry.getValue().entrySet()) {
 +        ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
 +        for (Entry<String, Integer> stringMapEntry :
 +            consensusGroupIdMapEntry.getValue().entrySet()) {
 +          String uuid = stringMapEntry.getKey();
 +          int command = stringMapEntry.getValue();
 +          System.out.printf(
 +              "%s - %s - %s - %s\n",
 +              endPoint, consensusGroupId, uuid, LoadCommand.values()[command]);
 +          assertEquals(LoadCommand.EXECUTE.ordinal(), command);
 +        }
 +      }
 +    }
 +  }
 +}
diff --cc iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
index 00000000000,630b60ce20a..3b24e32baf0
mode 000000,100644..100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
@@@ -1,0 -1,79 +1,83 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.commons.utils;
+ 
+ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+ import org.apache.iotdb.commons.conf.CommonDescriptor;
+ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+ 
+ public class TimePartitionUtils {
+ 
+   /** Time range for dividing database, the time unit is the same with IoTDB's TimestampPrecision */
+   private static long timePartitionInterval =
+       CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
+ 
+   public static TTimePartitionSlot getTimePartitionSlot(long time) {
++    return getTimePartitionSlot(time, timePartitionInterval);
++  }
++
++  public static TTimePartitionSlot getTimePartitionSlot(long time, long timePartitionInterval) {
+     TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
+     if (time > 0 || time % timePartitionInterval == 0) {
+       timePartitionSlot.setStartTime(time / timePartitionInterval * timePartitionInterval);
+     } else {
+       timePartitionSlot.setStartTime((time / timePartitionInterval - 1) * timePartitionInterval);
+     }
+     return timePartitionSlot;
+   }
+ 
+   public static long getTimePartitionInterval() {
+     return timePartitionInterval;
+   }
+ 
+   public static long getTimePartitionUpperBound(long time) {
+     long upperBoundOfTimePartition;
+     if (time > 0 || time % TimePartitionUtils.timePartitionInterval == 0) {
+       upperBoundOfTimePartition =
+           (time / TimePartitionUtils.timePartitionInterval + 1)
+               * TimePartitionUtils.timePartitionInterval;
+     } else {
+       upperBoundOfTimePartition =
+           (time / TimePartitionUtils.timePartitionInterval)
+               * TimePartitionUtils.timePartitionInterval;
+     }
+     return upperBoundOfTimePartition;
+   }
+ 
+   public static long getTimePartitionId(long time) {
+     return time > 0 || time % timePartitionInterval == 0
+         ? time / timePartitionInterval
+         : time / timePartitionInterval - 1;
+   }
+ 
+   public static boolean satisfyPartitionId(long startTime, long endTime, long partitionId) {
+     return getTimePartitionId(startTime) <= partitionId
+         && getTimePartitionId(endTime) >= partitionId;
+   }
+ 
+   public static boolean satisfyPartitionStartTime(Filter timeFilter, long partitionStartTime) {
+     return timeFilter == null
+         || timeFilter.satisfyStartEndTime(
+             partitionStartTime, partitionStartTime + timePartitionInterval);
+   }
+ 
+   public static void setTimePartitionInterval(long timePartitionInterval) {
+     TimePartitionUtils.timePartitionInterval = timePartitionInterval;
+   }
+ }