You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/10/16 01:39:25 UTC
[iotdb] 01/01: Merge branch 'master' into load_v2
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ffe7f4e15d78e0bd5db8ca7a1674cb0f8f2170d9
Merge: e7562e8a60b ba77da8a75a
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Oct 16 09:43:33 2023 +0800
Merge branch 'master' into load_v2
# Conflicts:
# iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
# iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
# iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/mlnode/MLNodeInfo.java
.github/workflows/client-go.yml | 1 +
.github/workflows/client-python.yml | 4 +
.github/workflows/cluster-it-1c1d.yml | 1 +
.github/workflows/cluster-it-1c3d.yml | 1 +
.github/workflows/grafana-plugin.yml | 5 +
.github/workflows/iotdb-ml.yml | 1 +
.github/workflows/pipe-it-2cluster.yml | 1 +
.../{sonar-coveralls.yml => sonar-codecov.yml} | 22 +-
.github/workflows/unit-test.yml | 1 +
.gitignore | 12 +-
iotdb-core/mlnode/README.md => .mvn/extensions.xml | 42 +-
.mvn/gradle-enterprise.xml | 48 +
Jenkinsfile | 6 +-
LICENSE | 21 +-
NOTICE | 2 +-
NOTICE-binary | 2 +-
README.md | 4 +-
checkstyle.xml | 4 +-
code-coverage/pom.xml | 2 -
codecov.yml | 6 +-
distribution/pom.xml | 13 +-
.../src/assembly/flink-sql-connector.xml | 48 +-
example/client-cpp-example/pom.xml | 1 -
example/flink-sql/pom.xml | 99 ++
.../org/apache/iotdb/example/BatchSinkExample.java | 74 ++
.../apache/iotdb/example/BoundedScanExample.java | 52 +
.../java/org/apache/iotdb/example/CDCExample.java | 54 +
.../org/apache/iotdb/example/LookupExample.java | 77 ++
.../apache/iotdb/example/StreamingSinkExample.java | 68 ++
....apache.iotdb.db.protocol.mqtt.PayloadFormatter | 19 +
example/pom.xml | 1 +
example/rest-java-example/pom.xml | 4 -
example/session/pom.xml | 5 -
example/trigger/pom.xml | 2 +-
example/udf/pom.xml | 2 +-
integration-test/checkstyle.xml | 2 +-
integration-test/import-control.xml | 194 ++-
integration-test/pom.xml | 163 ++-
.../iotdb/it/env/cluster/ClusterConstant.java | 2 +
.../it/env/cluster/config/MppCommonConfig.java | 13 +
.../env/cluster/config/MppSharedCommonConfig.java | 14 +
.../iotdb/it/env/cluster/env/AbstractEnv.java | 60 +-
.../iotdb/it/env/cluster/env/Cluster1Env.java | 6 +
.../iotdb/it/env/cluster/env/MultiClusterEnv.java | 6 +
.../apache/iotdb/it/env/cluster/env/SimpleEnv.java | 6 +
.../iotdb/it/env/cluster/node/DataNodeWrapper.java | 9 +
.../it/env/remote/config/RemoteCommonConfig.java | 10 +
.../iotdb/it/env/remote/env/RemoteServerEnv.java | 13 +
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 11 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 4 +
.../apache/iotdb/cli/it/StartClientScriptIT.java | 32 -
.../it/cluster/IoTDBClusterNodeErrorStartUpIT.java | 27 -
.../it/cluster/IoTDBClusterRestartIT.java | 29 +-
.../it/database/IoTDBDatabaseSetAndDeleteIT.java | 23 +-
.../it/partition/IoTDBPartitionGetterIT.java | 8 +-
.../confignode/it/utils/ConfigNodeTestUtils.java | 7 -
.../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java | 110 +-
.../apache/iotdb/db/it/IoTDBDatetimeFormatIT.java | 1 +
.../apache/iotdb/db/it/IoTDBInsertMultiRowIT.java | 11 +
.../apache/iotdb/db/it/IoTDBInsertWithQueryIT.java | 23 +-
.../iotdb/db/it/IoTDBPartialInsertionIT.java | 71 ++
.../apache/iotdb/db/it/{ => auth}/IoTDBAuthIT.java | 529 ++++++--
.../it => db/it/auth}/IoTDBClusterAuthorityIT.java | 208 ++--
.../iotdb/db/it/auth/IoTDBSeriesPermissionIT.java | 305 +++++
.../iotdb/db/it/auth/IoTDBSystemPermissionIT.java | 329 +++++
.../db/it/auth/IoTDBTemplatePermissionIT.java | 161 +++
.../java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java | 88 --
.../iotdb/db/it/schema/IoTDBMetadataFetchIT.java | 8 +-
.../iotdb/db/it/schema/IoTDBSchemaTemplateIT.java | 29 +-
.../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java | 9 +-
.../db/it/strangepath/IoTDBStrangePathIT.java | 78 ++
.../db/it/trigger/IoTDBTriggerManagementIT.java | 117 +-
.../iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java | 17 +-
.../org/apache/iotdb/db/it/utils/TestUtils.java | 211 +++-
.../org/apache/iotdb/flink/it/AbstractTest.java | 46 +
.../java/org/apache/iotdb/flink/it/SinkTest.java | 163 +++
.../java/org/apache/iotdb/flink/it/SourceTest.java | 192 +++
.../test/java/org/apache/iotdb/flink/it/Utils.java | 93 ++
.../apache/iotdb/pipe/it/IoTDBPipeClusterIT.java | 766 ++++++++++++
...ncIT.java => IoTDBPipeConnectorParallelIT.java} | 56 +-
.../apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java | 64 +-
.../apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java | 645 ++++++++++
.../apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java | 415 +++++++
.../iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java | 328 +++++
.../apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java | 667 +++++++++++
.../pipe/it/extractor/IoTDBPipeExtractorIT.java | 605 ++++++++++
.../iotdb/session/it/IoTDBSessionComplexIT.java | 163 +++
.../iotdb/session/it/IoTDBSessionSimpleIT.java | 81 ++
iotdb-api/external-api/pom.xml | 2 +-
iotdb-api/pipe-api/pom.xml | 2 +-
.../org/apache/iotdb/pipe/api/PipeConnector.java | 10 +-
.../org/apache/iotdb/pipe/api/PipeProcessor.java | 10 +-
.../event/dml/insertion/TsFileInsertionEvent.java | 2 +-
iotdb-api/trigger-api/pom.xml | 2 +-
iotdb-api/udf-api/pom.xml | 2 +-
iotdb-client/cli/pom.xml | 2 -
.../cli/src/assembly/resources/sbin/start-cli.sh | 2 +-
.../java/org/apache/iotdb/cli/AbstractCli.java | 56 +-
.../src/main/java/org/apache/iotdb/cli/Cli.java | 4 -
.../java/org/apache/iotdb/tool/ExportTsFile.java | 14 +-
.../main/java/org/apache/iotdb/tool/ImportCsv.java | 2 +-
.../java/org/apache/iotdb/cli/AbstractCliIT.java | 39 -
iotdb-client/client-cpp/pom.xml | 7 +-
iotdb-client/client-py/.gitignore | 2 +
iotdb-client/client-py/SessionPoolExample.py | 21 +-
iotdb-client/client-py/iotdb/IoTDBContainer.py | 9 +-
iotdb-client/client-py/iotdb/Session.py | 113 +-
iotdb-client/client-py/iotdb/SessionPool.py | 60 +-
.../iotdb/tsfile/utils/ReadWriteIOUtils.py | 12 +-
iotdb-client/client-py/iotdb/utils/Field.py | 148 +--
.../client-py/iotdb/utils/IoTDBConstants.py | 33 +-
.../client-py/iotdb/utils/IoTDBRpcDataSet.py | 299 +++--
iotdb-client/client-py/iotdb/utils/NumpyTablet.py | 8 +-
.../client-py/iotdb/utils/SessionDataSet.py | 90 +-
iotdb-client/client-py/iotdb/utils/Tablet.py | 14 +-
iotdb-client/client-py/pom.xml | 62 +-
iotdb-client/client-py/release.sh | 19 +-
iotdb-client/client-py/requirements_dev.txt | 2 +-
iotdb-client/client-py/{ => resources}/setup.py | 2 +-
iotdb-client/client-py/tests/test_session.py | 60 +-
iotdb-client/client-py/tests/test_session_pool.py | 33 +-
iotdb-client/client-py/tests/test_tablet.py | 18 +-
iotdb-client/jdbc/pom.xml | 4 +-
iotdb-client/service-rpc/pom.xml | 2 +-
.../org/apache/iotdb/rpc/RpcTransportFactory.java | 25 +
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 29 +-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 6 +-
.../java/org/apache/iotdb/rpc/RpcUtilsTest.java | 67 ++
iotdb-client/session/pom.xml | 2 +-
.../iotdb/session/SessionConnectionTest.java | 388 ++++++
.../java/org/apache/iotdb/session/SessionTest.java | 1175 ++++++++++++++++++
.../session/pool/SessionPoolExceptionTest.java | 266 ++++
.../apache/iotdb/session/pool/SessionPoolTest.java | 1264 +++++++++++++++++++-
.../iotdb/session/util/SessionUtilsTest.java | 176 +++
iotdb-connector/flink-sql-iotdb-connector/pom.xml | 33 +-
.../org/apache/iotdb/flink/sql/common/Options.java | 8 +-
.../org/apache/iotdb/flink/sql/common/Utils.java | 38 +-
.../sql/factory/IoTDBDynamicTableFactory.java | 96 +-
.../sql/function/IoTDBBoundedScanFunction.java | 50 +-
.../flink/sql/function/IoTDBCDCSourceFunction.java | 93 +-
.../flink/sql/function/IoTDBLookupFunction.java | 40 +-
.../flink/sql/function/IoTDBSinkFunction.java | 74 +-
iotdb-connector/grafana-connector/pom.xml | 20 +-
iotdb-connector/grafana-plugin/backend-compile.sh | 2 +-
iotdb-connector/grafana-plugin/go.mod | 85 +-
.../grafana-plugin/pkg/plugin/plugin.go | 2 +-
iotdb-connector/grafana-plugin/pom.xml | 11 +-
iotdb-connector/grafana-plugin/yarn.lock | 16 +-
iotdb-connector/hadoop/pom.xml | 4 +-
.../apache/iotdb/hadoop/fileSystem/HDFSOutput.java | 5 +
iotdb-connector/hive-connector/pom.xml | 4 +-
iotdb-connector/spark-iotdb-connector/pom.xml | 3 -
iotdb-connector/spark-tsfile/pom.xml | 1 -
iotdb-connector/zeppelin-interpreter/pom.xml | 14 +-
iotdb-core/antlr/pom.xml | 2 -
.../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 9 +-
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 60 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 49 +-
iotdb-core/confignode/pom.xml | 22 +-
.../resources/conf/iotdb-confignode.properties | 2 +-
.../client/async/AsyncDataNodeClientPool.java | 45 +-
.../client/sync/SyncConfigNodeClientPool.java | 3 -
.../iotdb/confignode/conf/ConfigNodeConfig.java | 44 +-
.../confignode/conf/ConfigNodeDescriptor.java | 48 +-
.../confignode/conf/SystemPropertiesUtils.java | 5 +-
.../consensus/request/ConfigPhysicalPlan.java | 10 +-
.../consensus/request/ConfigPhysicalPlanType.java | 60 +-
.../consensus/request/auth/AuthorPlan.java | 19 +
.../request/read/database/CountDatabasePlan.java | 15 +-
.../request/read/database/GetDatabasePlan.java | 5 +-
.../{ShowTrailPlan.java => GetModelInfoPlan.java} | 38 +-
.../{ShowTrailPlan.java => ShowTrialPlan.java} | 38 +-
.../read/partition/GetNodePathsPartitionPlan.java | 12 +
.../read/template/GetPathsSetTemplatePlan.java | 11 +-
.../request/write/model/UpdateModelInfoPlan.java | 16 +-
.../request/write/model/UpdateModelStatePlan.java | 18 +-
.../response/auth/PermissionInfoResp.java | 40 +-
.../GetModelInfoResp.java} | 36 +-
.../response/{ => model}/ModelTableResp.java | 2 +-
.../TrialTableResp.java} | 28 +-
.../statemachine/ConfigRegionStateMachine.java | 65 +-
.../iotdb/confignode/manager/ConfigManager.java | 157 ++-
.../apache/iotdb/confignode/manager/IManager.java | 33 +-
.../iotdb/confignode/manager/ModelManager.java | 99 +-
.../confignode/manager/PermissionManager.java | 56 +-
.../iotdb/confignode/manager/ProcedureManager.java | 29 +-
.../manager/consensus/ConsensusManager.java | 28 +-
.../iotdb/confignode/manager/cq/CQManager.java | 9 -
.../manager/load/balancer/RouteBalancer.java | 2 +-
.../iotdb/confignode/manager/node/NodeManager.java | 17 +-
.../manager/partition/PartitionManager.java | 5 +-
.../manager/partition/PartitionMetrics.java | 250 ++--
.../manager/pipe/runtime/PipeHeartbeatParser.java | 27 +-
.../pipe/runtime/PipeHeartbeatScheduler.java | 8 +-
.../manager/pipe/runtime/PipeMetaSyncer.java | 12 +-
.../manager/pipe/task/PipeTaskCoordinator.java | 35 +-
.../manager/pipe/task/PipeTaskCoordinatorLock.java | 103 ++
.../manager/schema/ClusterSchemaManager.java | 32 +-
.../iotdb/confignode/persistence/AuthorInfo.java | 417 +++++--
.../iotdb/confignode/persistence/ModelInfo.java | 148 ++-
.../iotdb/confignode/persistence/cq/CQInfo.java | 1 +
.../persistence/executor/ConfigPlanExecutor.java | 28 +-
.../confignode/persistence/node/NodeInfo.java | 7 +-
.../persistence/partition/PartitionInfo.java | 29 +-
.../confignode/persistence/pipe/PipeTaskInfo.java | 2 +-
.../persistence/schema/ClusterSchemaInfo.java | 27 +-
.../confignode/persistence/schema/ConfigMTree.java | 41 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 23 +-
.../procedure/env/DataNodeRemoveHandler.java | 2 +-
.../procedure/impl/model/CreateModelProcedure.java | 125 +-
.../procedure/impl/model/DropModelProcedure.java | 131 +-
.../impl/node/AddConfigNodeProcedure.java | 9 +
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 176 ++-
.../impl/pipe/task/CreatePipeProcedureV2.java | 2 +-
.../impl/schema/DeleteDatabaseProcedure.java | 2 +-
.../impl/sync/AuthOperationProcedure.java | 235 ++++
.../AuthOperationProcedureState.java} | 10 +-
.../procedure/state/model/CreateModelState.java | 2 -
.../procedure/store/ProcedureFactory.java | 6 +
.../confignode/procedure/store/ProcedureType.java | 5 +-
.../confignode/procedure/store/ProcedureWAL.java | 3 +
.../iotdb/confignode/service/ConfigNode.java | 65 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 89 +-
.../request/ConfigPhysicalPlanSerDeTest.java | 75 +-
.../consensus/response/pipe/PipeTableRespTest.java | 6 +-
.../confignode/persistence/AuthorInfoTest.java | 974 ++++++++++++---
.../iotdb/confignode/persistence/NodeInfoTest.java | 2 +-
.../confignode/persistence/PartitionInfoTest.java | 2 +-
.../iotdb/confignode/persistence/PipeInfoTest.java | 4 +-
.../persistence/schema/ClusterSchemaInfoTest.java | 6 +-
.../persistence/schema/ConfigMTreeTest.java | 44 +-
.../impl/sync/AuthOperationProcedureTest.java | 98 ++
iotdb-core/consensus/pom.xml | 2 +-
.../org/apache/iotdb/consensus/IStateMachine.java | 5 +
.../apache/iotdb/consensus/config/RatisConfig.java | 50 +-
.../consensus/iot/IoTConsensusServerImpl.java | 64 +-
.../consensus/iot/IoTConsensusServerMetrics.java | 154 ++-
.../consensus/iot/client/DispatchLogHandler.java | 3 +-
.../iot/client/IoTConsensusClientPool.java | 11 +-
.../consensus/iot/logdispatcher/LogDispatcher.java | 3 +-
.../logdispatcher/LogDispatcherThreadMetrics.java | 50 +-
.../service/IoTConsensusRPCServiceProcessor.java | 3 +
.../ratis/ApplicationStateMachineProxy.java | 5 +
.../apache/iotdb/consensus/ratis/utils/Utils.java | 28 +
.../simple/SimpleConsensusServerImpl.java | 1 +
.../apache/iotdb/consensus/iot/ReplicateTest.java | 129 +-
.../consensus/ratis/{ => utils}/UtilsTest.java | 32 +-
iotdb-core/datanode/pom.xml | 26 +-
.../resources/conf/iotdb-datanode.properties | 2 +-
.../check-overlap-sequence-files-and-repair.bat | 62 +
.../check-overlap-sequence-files-and-repair.sh | 46 +-
.../tools/tsfile/overlap-statistic-tool.bat | 62 +
.../tools/tsfile/overlap-statistic-tool.sh | 46 +-
.../org/apache/iotdb/db/audit/AuditLogger.java | 7 +-
.../org/apache/iotdb/db/auth/AuthorityChecker.java | 384 +++---
.../apache/iotdb/db/auth/AuthorizerManager.java | 432 -------
.../apache/iotdb/db/auth/BasicAuthorityCache.java | 12 +-
.../iotdb/db/auth/ClusterAuthorityFetcher.java | 445 +++++--
.../org/apache/iotdb/db/auth/IAuthorCache.java | 2 +
.../apache/iotdb/db/auth/IAuthorityFetcher.java | 14 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 27 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +
.../org/apache/iotdb/db/conf/IoTDBStartCheck.java | 27 +-
.../db/consensus/SchemaRegionConsensusImpl.java | 4 +
.../dataregion/DataExecutionVisitor.java | 2 +-
.../{ => runtime}/IntoProcessException.java | 2 +-
.../ModelInferenceProcessException.java} | 6 +-
.../{ => runtime}/WriteLockFailedException.java | 2 +-
.../pipe/agent/runtime/PipeCronEventInjector.java | 2 +-
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 5 +-
.../SimpleConsensusProgressIndexAssigner.java | 12 +-
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 89 +-
.../config/constant/PipeConnectorConstant.java | 16 +
...eResponse.java => AirGapELanguageConstant.java} | 14 +-
.../payload/airgap/AirGapOneByteResponse.java | 4 +
...oTDBThriftAsyncPipeTransferBatchReqBuilder.java | 7 +-
...IoTDBThriftSyncPipeTransferBatchReqBuilder.java | 3 +-
.../protocol/airgap/IoTDBAirGapConnector.java | 33 +-
.../connector/protocol/opcua/OpcUaConnector.java | 290 +++++
.../protocol/opcua/OpcUaKeyStoreLoader.java | 120 ++
.../protocol/opcua/OpcUaServerBuilder.java | 287 +++++
.../thrift/async/IoTDBThriftAsyncConnector.java | 19 +-
.../thrift/sync/IoTDBThriftSyncConnector.java | 4 +-
...ocketConnector.java => WebSocketConnector.java} | 67 +-
.../websocket/WebSocketConnectorServer.java | 84 +-
.../apache/iotdb/db/pipe/event/EnrichedEvent.java | 54 +-
.../event/common/heartbeat/PipeHeartbeatEvent.java | 150 ++-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 3 +-
.../common/tablet/PipeRawTabletInsertionEvent.java | 4 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 34 +-
.../tsfile/TsFileInsertionDataContainer.java | 3 +-
.../db/pipe/event/realtime/PipeRealtimeEvent.java | 14 +-
.../PipeHistoricalDataRegionTsFileExtractor.java | 123 +-
.../realtime/PipeRealtimeDataRegionExtractor.java | 49 +-
.../PipeRealtimeDataRegionFakeExtractor.java | 2 +-
.../PipeRealtimeDataRegionHybridExtractor.java | 208 ++--
.../PipeRealtimeDataRegionLogExtractor.java | 40 +-
.../PipeRealtimeDataRegionTsFileExtractor.java | 40 +-
.../realtime/assigner/DisruptorQueue.java | 6 +
.../realtime/assigner/PipeDataRegionAssigner.java | 2 +-
.../pipe/extractor/realtime/epoch/TsFileEpoch.java | 3 +-
.../realtime/epoch/TsFileEpochManager.java | 4 +-
.../matcher/CachedSchemaPatternMatcher.java | 6 +-
.../pipe/receiver/airgap/IoTDBAirGapReceiver.java | 58 +-
.../receiver/thrift/IoTDBThriftReceiverV1.java | 5 +-
.../resource/tsfile/PipeTsFileResourceManager.java | 16 +-
.../db/pipe/resource/wal/PipeWALResource.java | 2 +-
.../pipe/resource/wal/PipeWALResourceManager.java | 4 +
.../apache/iotdb/db/pipe/task/PipeTaskManager.java | 30 +-
.../pipe/task/connection/BlockingPendingQueue.java | 66 +-
.../db/pipe/task/connection/EnrichedDeque.java | 98 ++
.../pipe/task/connection/PipeEventCollector.java | 39 +-
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 7 +-
.../iotdb/db/pipe/task/subtask/PipeSubtask.java | 54 +-
.../subtask/connector/PipeConnectorSubtask.java | 59 +-
.../connector/PipeConnectorSubtaskManager.java | 138 ++-
.../subtask/processor/PipeProcessorSubtask.java | 77 +-
.../iotdb/db/protocol/client/ConfigNodeClient.java | 75 +-
.../iotdb/db/protocol/client/ConfigNodeInfo.java | 2 +-
.../db/protocol/client/DataNodeInternalClient.java | 2 +-
.../db/protocol/mqtt/BrokerAuthenticator.java | 14 +-
.../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 2 +
.../protocol/rest/filter/AuthorizationFilter.java | 6 +-
.../rest/handler/AuthorizationHandler.java | 19 +-
.../PingApiServiceImpl.java} | 41 +-
.../v1/handler/StatementConstructionHandler.java | 19 +-
.../v2/handler/StatementConstructionHandler.java | 20 +-
.../iotdb/db/protocol/session/SessionManager.java | 4 +-
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 10 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 33 +-
.../protocol/thrift/impl/MLNodeRPCServiceImpl.java | 14 +-
.../common/header/ColumnHeaderConstant.java | 9 +-
.../common/header/DatasetHeaderFactory.java | 4 +-
.../execution/exchange/SharedTsBlockQueue.java | 9 +
.../fragment/FragmentInstanceManager.java | 11 +
.../execution/load/AlignedChunkData.java | 4 +-
.../execution/load/DataPartitionBatchFetcher.java | 5 +-
.../load/DeviceBatchTsFileDataManager.java | 5 +-
.../execution/load/LoadTsFileManager.java | 7 +-
.../execution/load/MergedTsFileSplitter.java | 43 +-
.../execution/load/NonAlignedChunkData.java | 4 +-
.../execution/load/TsFileDataManager.java | 8 +-
.../execution/load/TsFileSplitSender.java | 54 +-
.../queryengine/execution/load/TsFileSplitter.java | 20 +-
.../nodesplit/ClusteringMeasurementSplitter.java | 167 +--
.../queryengine/execution/memory/MemoryPool.java | 1 +
.../execution/operator/AggregationUtil.java | 2 +-
.../process/AbstractConsumeAllOperator.java | 101 +-
.../operator/process/AbstractIntoOperator.java | 2 +-
.../operator/process/MergeSortOperator.java | 61 +-
.../process/join/RowBasedTimeJoinOperator.java | 57 +-
.../operator/process/last/LastQueryOperator.java | 6 +-
.../process/last/LastQuerySortOperator.java | 4 +-
.../process/last/LastQueryTransformOperator.java | 120 ++
.../operator/process/ml/ForecastOperator.java | 241 ++++
.../operator/schema/source/DeviceSchemaSource.java | 13 +-
.../schema/source/LogicalViewSchemaSource.java | 17 +-
.../operator/schema/source/NodeSchemaSource.java | 11 +-
.../schema/source/PathsUsingTemplateSource.java | 8 +-
.../schema/source/SchemaSourceFactory.java | 42 +-
.../schema/source/TimeSeriesSchemaSource.java | 14 +-
.../db/queryengine/plan/analyze/Analysis.java | 41 +-
.../queryengine/plan/analyze/AnalyzeVisitor.java | 280 ++++-
.../db/queryengine/plan/analyze/Analyzer.java | 13 +-
.../plan/analyze/ClusterPartitionFetcher.java | 39 +-
.../plan/analyze/ConcatPathRewriter.java | 11 +-
.../plan/analyze/ExpressionAnalyzer.java | 7 +
.../plan/analyze/ExpressionTypeAnalyzer.java | 2 +
.../plan/analyze/IModelFetcher.java} | 9 +-
.../plan/analyze/IPartitionFetcher.java | 12 +-
.../plan/analyze/LoadTsfileAnalyzer.java | 126 +-
.../db/queryengine/plan/analyze/ModelFetcher.java | 75 ++
.../queryengine/plan/analyze/SelectIntoUtils.java | 10 +-
.../analyze/cache/partition/PartitionCache.java | 54 +-
.../analyze/cache/schema/DataNodeSchemaCache.java | 64 +-
.../schema/DeviceUsingTemplateSchemaCache.java | 18 +
.../analyze/cache/schema/SchemaCacheEntry.java | 16 +-
.../cache/schema/TimeSeriesSchemaCache.java | 71 +-
.../cache/schema/dualkeycache/IDualKeyCache.java | 22 +
.../dualkeycache/impl/CacheEntryGroupImpl.java | 6 +
.../schema/dualkeycache/impl/DualKeyCacheImpl.java | 159 ++-
.../dualkeycache/impl/FIFOCacheEntryManager.java | 60 +-
.../schema/dualkeycache/impl/ICacheEntry.java | 2 +
.../schema/dualkeycache/impl/ICacheEntryGroup.java | 4 +
.../dualkeycache/impl/ICacheEntryManager.java | 2 +
.../dualkeycache/impl/LRUCacheEntryManager.java | 86 +-
.../schema/lastcache/DataNodeLastCacheManager.java | 6 +-
.../schema/lastcache/ILastCacheContainer.java | 2 +-
.../analyze/schema/AutoCreateSchemaExecutor.java | 40 +
.../analyze/schema/ClusterSchemaFetchExecutor.java | 14 +-
.../plan/execution/config/ConfigExecution.java | 10 +-
.../plan/execution/config/ConfigTaskVisitor.java | 208 ++--
.../config/executor/ClusterConfigTaskExecutor.java | 317 +++--
.../config/executor/IConfigTaskExecutor.java | 12 +-
.../config/metadata/CreateContinuousQueryTask.java | 13 +-
.../config/metadata/CreatePipePluginTask.java | 2 +-
.../config/metadata/DropPipePluginTask.java | 2 +-
.../config/metadata/ShowClusterDetailsTask.java | 54 +-
.../execution/config/metadata/ShowClusterTask.java | 30 +-
.../config/metadata/model/CreateModelTask.java | 7 +-
.../config/metadata/model/ShowModelsTask.java | 12 +-
.../{ShowTrailsTask.java => ShowTrialsTask.java} | 10 +-
.../config/metadata/view/AlterLogicalViewTask.java | 10 +-
.../plan/execution/config/sys/AuthorizerTask.java | 7 +-
.../plan/expression/ExpressionFactory.java | 9 +
.../plan/expression/multi/FunctionExpression.java | 10 +
.../plan/expression/multi/FunctionType.java | 3 +-
.../visitor/RemoveRootPrefixVisitor.java | 45 +
.../db/queryengine/plan/parser/ASTVisitor.java | 174 ++-
.../plan/parser/StatementGenerator.java | 75 +-
.../plan/planner/LogicalPlanBuilder.java | 196 ++-
.../plan/planner/LogicalPlanVisitor.java | 67 +-
.../plan/planner/OperatorTreeGenerator.java | 107 +-
.../plan/planner/SubPlanTypeExtractor.java | 15 +
.../distribution/DistributionPlanContext.java | 5 +-
.../planner/distribution/DistributionPlanner.java | 10 +-
.../planner/distribution/ExchangeNodeAdder.java | 82 +-
.../planner/distribution/NodeGroupContext.java | 18 +-
.../SimpleFragmentParallelPlanner.java | 5 +-
.../plan/planner/distribution/SourceRewriter.java | 91 +-
.../db/queryengine/plan/planner/plan/SubPlan.java | 5 +-
.../plan/planner/plan/node/PlanGraphPrinter.java | 28 +
.../plan/planner/plan/node/PlanNodeType.java | 9 +-
.../plan/planner/plan/node/PlanVisitor.java | 10 +
.../plan/node/load/LoadSingleTsFileNode.java | 6 +-
.../plan/node/metedata/read/DevicesCountNode.java | 13 +-
.../node/metedata/read/DevicesSchemaScanNode.java | 13 +-
.../metedata/read/LevelTimeSeriesCountNode.java | 13 +-
.../metedata/read/LogicalViewSchemaScanNode.java | 17 +-
.../metedata/read/NodePathsSchemaScanNode.java | 12 +-
.../metedata/read/PathsUsingTemplateScanNode.java | 11 +-
.../node/metedata/read/SchemaQueryScanNode.java | 43 +-
.../node/metedata/read/TimeSeriesCountNode.java | 15 +-
.../metedata/read/TimeSeriesSchemaScanNode.java | 29 +-
.../node/process/last/LastQueryCollectNode.java | 17 +-
.../plan/node/process/last/LastQueryMergeNode.java | 24 +-
.../plan/node/process/last/LastQueryNode.java | 27 +-
...ollectNode.java => LastQueryTransformNode.java} | 92 +-
.../planner/plan/node/process/ml/ForecastNode.java | 122 ++
.../planner/plan/node/sink/IdentitySinkNode.java | 5 +
.../plan/node/source/AlignedLastQueryScanNode.java | 20 +-
.../plan/node/source/AlignedSeriesScanNode.java | 4 +-
.../plan/node/source/LastQueryScanNode.java | 20 +-
.../planner/plan/node/write/DeleteDataNode.java | 24 +-
.../plan/planner/plan/node/write/InsertNode.java | 1 +
.../planner/plan/node/write/InsertRowNode.java | 6 +-
.../planner/plan/node/write/InsertRowsNode.java | 4 +-
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 4 +-
.../planner/plan/node/write/InsertTabletNode.java | 78 +-
.../plan/node/write/PipeEnrichedInsertNode.java | 1 +
.../model/ForecastModelInferenceDescriptor.java | 176 +++
.../parameter/model/ModelInferenceDescriptor.java | 111 ++
.../plan/scheduler/load/LoadTsFileScheduler.java | 8 +-
.../statement/AuthorityInformationStatement.java | 48 +
.../db/queryengine/plan/statement/Statement.java | 13 +-
.../plan/statement/StatementVisitor.java | 12 +-
.../plan/statement/component/IntoComponent.java | 2 +
.../plan/statement/component/ResultColumn.java | 5 +-
.../plan/statement/component/SelectComponent.java | 10 +-
.../plan/statement/crud/DeleteDataStatement.java | 17 +
.../plan/statement/crud/InsertBaseStatement.java | 17 +
.../plan/statement/crud/InsertRowStatement.java | 4 +-
.../crud/InsertRowsOfOneDeviceStatement.java | 5 +-
.../plan/statement/crud/InsertStatement.java | 17 +
.../plan/statement/crud/InsertTabletStatement.java | 17 +-
.../plan/statement/crud/LoadTsFileStatement.java | 8 +
.../crud/PipeEnrichedInsertBaseStatement.java | 8 +-
.../crud/PipeEnrichedLoadTsFileStatement.java | 5 -
.../plan/statement/crud/QueryStatement.java | 100 +-
.../InternalCreateMultiTimeSeriesStatement.java | 25 +-
.../InternalCreateTimeSeriesStatement.java | 17 +
.../metadata/AlterTimeSeriesStatement.java | 15 +
.../metadata/CreateAlignedTimeSeriesStatement.java | 17 +
.../metadata/CreateContinuousQueryStatement.java | 14 +
.../metadata/CreateFunctionStatement.java | 14 +
.../metadata/CreateMultiTimeSeriesStatement.java | 17 +
.../metadata/CreateTimeSeriesStatement.java | 15 +
.../statement/metadata/CreateTriggerStatement.java | 14 +
.../metadata/DatabaseSchemaStatement.java | 14 +
.../metadata/DeleteDatabaseStatement.java | 14 +
.../metadata/DeleteTimeSeriesStatement.java | 17 +
.../metadata/DropContinuousQueryStatement.java | 14 +
.../statement/metadata/DropFunctionStatement.java | 14 +
.../statement/metadata/DropTriggerStatement.java | 32 +-
.../plan/statement/metadata/SetTTLStatement.java | 21 +-
.../statement/metadata/ShowClusterStatement.java | 14 +
.../metadata/ShowConfigNodesStatement.java | 9 +
.../metadata/ShowContinuousQueriesStatement.java | 14 +
.../statement/metadata/ShowDataNodesStatement.java | 9 +
.../statement/metadata/ShowFunctionsStatement.java | 14 +
.../statement/metadata/ShowRegionStatement.java | 9 +
.../plan/statement/metadata/ShowStatement.java | 4 +-
.../statement/metadata/ShowTriggersStatement.java | 14 +
.../statement/metadata/ShowVariablesStatement.java | 14 +
.../metadata/model/CreateModelStatement.java | 71 +-
...ailsStatement.java => ShowTrialsStatement.java} | 6 +-
.../{ => pipe}/CreatePipePluginStatement.java | 16 +-
.../metadata/pipe/CreatePipeStatement.java | 14 +
.../{ => pipe}/DropPipePluginStatement.java | 16 +-
.../statement/metadata/pipe/DropPipeStatement.java | 14 +
.../{ => pipe}/ShowPipePluginsStatement.java | 17 +-
.../metadata/pipe/ShowPipesStatement.java | 14 +
.../metadata/pipe/StartPipeStatement.java | 14 +
.../statement/metadata/pipe/StopPipeStatement.java | 14 +
.../template/ActivateTemplateStatement.java | 30 +-
.../template/BatchActivateTemplateStatement.java | 36 +-
.../template/DeactivateTemplateStatement.java | 39 +-
.../ShowNodesInSchemaTemplateStatement.java | 9 +
.../template/ShowPathSetTemplateStatement.java | 9 +
.../template/ShowSchemaTemplateStatement.java | 9 +
.../metadata/view/AlterLogicalViewStatement.java | 45 +
.../metadata/view/CreateLogicalViewStatement.java | 39 +
.../metadata/view/DeleteLogicalViewStatement.java | 17 +
.../metadata/view/RenameLogicalViewStatement.java | 19 +
.../plan/statement/sys/AuthorStatement.java | 136 ++-
.../plan/statement/sys/KillQueryStatement.java | 14 +
.../plan/statement/sys/ShowQueriesStatement.java | 14 +
.../plan/statement/sys/ShowVersionStatement.java | 9 +
.../MultiInputColumnIntermediateLayer.java | 10 +-
.../schemaengine/metric/SchemaEngineMemMetric.java | 24 -
.../schemaengine/metric/SchemaMetricManager.java | 3 +-
.../metric/SchemaRegionCachedMetric.java | 4 +-
.../schemaengine/metric/SchemaRegionMemMetric.java | 36 +-
.../schemaengine/schemaregion/ISchemaRegion.java | 2 +-
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 4 +-
.../schemaregion/impl/SchemaRegionPBTreeImpl.java | 4 +-
.../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 54 +-
.../impl/mem/snapshot/MemMTreeSnapshotUtil.java | 9 +-
.../mtree/impl/pbtree/MTreeBelowSGCachedImpl.java | 56 +-
.../schemaregion/mtree/traverser/Traverser.java | 11 +-
.../mtree/traverser/basic/DatabaseTraverser.java | 10 +-
.../mtree/traverser/basic/EntityTraverser.java | 11 +-
.../mtree/traverser/basic/MNodeTraverser.java | 11 +-
.../traverser/basic/MeasurementTraverser.java | 12 +-
.../traverser/collector/DatabaseCollector.java | 9 +-
.../mtree/traverser/collector/EntityCollector.java | 9 +-
.../traverser/collector/MNodeAboveDBCollector.java | 9 +-
.../mtree/traverser/collector/MNodeCollector.java | 9 +-
.../traverser/collector/MeasurementCollector.java | 9 +-
.../mtree/traverser/counter/DatabaseCounter.java | 10 +-
.../mtree/traverser/counter/EntityCounter.java | 10 +-
.../traverser/counter/MeasurementCounter.java | 9 +-
.../mtree/traverser/updater/EntityUpdater.java | 11 +-
.../traverser/updater/MeasurementUpdater.java | 10 +-
.../schemaregion/read/req/IShowSchemaPlan.java | 3 +
.../read/req/SchemaRegionReadPlanFactory.java | 69 +-
.../read/req/impl/AbstractShowSchemaPlanImpl.java | 13 +-
.../read/req/impl/ShowDevicesPlanImpl.java | 6 +-
.../read/req/impl/ShowNodesPlanImpl.java | 6 +-
.../read/req/impl/ShowTimeSeriesPlanImpl.java | 6 +-
.../template/ClusterTemplateManager.java | 8 +-
.../db/schemaengine/template/ITemplateManager.java | 4 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 22 +-
.../apache/iotdb/db/service/MLNodeRPCService.java | 8 +-
.../iotdb/db/service/metrics/FileMetrics.java | 542 +--------
.../metrics/IoTDBInternalLocalReporter.java | 11 +-
.../metrics/file/CompactionFileMetrics.java | 165 +++
.../db/service/metrics/file/ModsFileMetrics.java | 85 ++
.../metrics/file/SystemRelatedFileMetrics.java | 109 ++
.../db/service/metrics/file/TsFileMetrics.java | 414 +++++++
.../db/service/metrics/file/WalFileMetrics.java | 58 +
.../iotdb/db/storageengine/StorageEngine.java | 31 +-
.../db/storageengine/dataregion/DataRegion.java | 144 +--
.../FileCannotTransitToCompactingException.java} | 31 +-
.../execute/task/AbstractCompactionTask.java | 71 +-
.../task/CompactionTaskType.java} | 16 +-
.../execute/task/CrossSpaceCompactionTask.java | 98 +-
.../execute/task/InnerSpaceCompactionTask.java | 112 +-
.../compaction/execute/utils/CompactionUtils.java | 86 +-
.../execute/utils/log/CompactionLogger.java | 34 +-
.../compaction/io/CompactionTsFileWriter.java | 34 +-
.../compaction/schedule/CompactionScheduler.java | 28 +-
.../compaction/schedule/CompactionTaskManager.java | 4 -
.../compaction/schedule/CompactionWorker.java | 72 +-
.../DefaultCompactionTaskComparatorImpl.java | 7 +
.../compaction/selector/ICompactionSelector.java | 3 +-
.../selector/IInnerSeqSpaceSelector.java | 3 +-
.../selector/IInnerUnseqSpaceSelector.java | 3 +-
.../impl/SizeTieredCompactionSelector.java | 121 +-
.../compaction/settle/SettleRequestHandler.java | 1 -
.../ITimeRange.java} | 14 +-
.../Interval.java} | 33 +-
.../compaction/tool/ListTimeRangeImpl.java | 89 ++
.../compaction/tool/OverlapStatistic.java | 91 ++
.../compaction/tool/OverlapStatisticTool.java | 248 ++++
.../dataregion/compaction/tool/PrintUtil.java | 209 ++++
.../SequenceFileSubTaskThreadExecutor.java} | 25 +-
.../compaction/tool/SequenceFileTaskSummary.java | 58 +
.../compaction/tool/SingleSequenceFileTask.java | 104 ++
.../compaction/tool/TimePartitionProcessTask.java | 143 +++
.../tool/TimePartitionProcessWorker.java | 58 +
.../compaction/tool/TsFileStatisticReader.java | 96 ++
.../compaction/tool/UnseqSpaceStatistics.java | 84 ++
.../dataregion/flush/CompressionRatio.java | 132 +-
.../dataregion/memtable/AbstractMemTable.java | 60 +-
.../memtable/AlignedWritableMemChunk.java | 19 +-
.../dataregion/memtable/IMemTable.java | 6 +
.../dataregion/memtable/PrimitiveMemTable.java | 18 +-
.../dataregion/memtable/TsFileProcessor.java | 16 +-
.../dataregion/memtable/WritableMemChunk.java | 18 +-
.../io/LocalTextModificationAccessor.java | 36 +-
.../modification/io/ModificationWriter.java | 3 +
.../dataregion/snapshot/SnapshotLogger.java | 6 +-
.../dataregion/tsfile/TsFileManager.java | 30 +-
.../dataregion/tsfile/TsFileResource.java | 94 +-
.../tsfile/timeindex/DeviceTimeIndex.java | 9 +-
.../dataregion/tsfile/timeindex/FileTimeIndex.java | 6 +-
.../storageengine/dataregion/wal/WALManager.java | 27 +-
.../dataregion/wal/buffer/WALBuffer.java | 30 +-
.../storageengine/dataregion/wal/node/WALNode.java | 22 +-
.../dataregion/wal/recover/WALRecoverManager.java | 15 +-
.../wal/recover/file/TsFilePlanRedoer.java | 4 +-
.../file/UnsealedTsFileRecoverPerformer.java | 10 +-
.../dataregion/wal/utils/WALInsertNodeCache.java | 19 +-
.../storageengine/rescon/disk/FolderManager.java | 2 +
.../rescon/memory/MemTableManager.java | 8 +-
.../db/storageengine/rescon/memory/SystemInfo.java | 9 +-
.../iotdb/db/tools/TsFileSplitByPartitionTool.java | 10 +-
.../TsFileOverlapValidationAndRepairTool.java | 276 +++++
.../org/apache/iotdb/db/utils/CommonUtils.java | 7 +
.../apache/iotdb/db/utils/TimePartitionUtils.java | 49 -
.../iotdb/db/utils/TimestampPrecisionUtils.java | 44 +-
.../iotdb/db/utils/constant/SqlConstant.java | 3 +
.../iotdb/db/auth/AuthorizerManagerTest.java | 187 ++-
.../auth/authorizer/LocalFileAuthorizerTest.java | 78 +-
.../iotdb/db/auth/entity/PathPrivilegeTest.java | 52 +-
.../org/apache/iotdb/db/auth/entity/RoleTest.java | 23 +-
.../org/apache/iotdb/db/auth/entity/UserTest.java | 11 +-
.../db/auth/role/LocalFileRoleAccessorTest.java | 70 +-
.../db/auth/role/LocalFileRoleManagerTest.java | 99 +-
.../db/auth/user/LocalFileUserAccessorTest.java | 71 +-
.../db/auth/user/LocalFileUserManagerTest.java | 155 +--
.../cache/dualkeycache/DualKeyCacheTest.java | 185 ++-
.../schemaRegion/SchemaRegionAliasAndTagTest.java | 21 +-
.../schemaRegion/SchemaRegionBasicTest.java | 172 +--
.../schemaRegion/SchemaRegionManagementTest.java | 22 +-
.../schemaRegion/SchemaRegionTemplateTest.java | 6 +-
.../schemaRegion/SchemaRegionTestUtil.java | 90 +-
.../extractor/CachedSchemaPatternMatcherTest.java | 2 +-
.../db/protocol/mqtt/BrokerAuthenticatorTest.java | 15 +-
.../db/queryengine/execution/DataDriverTest.java | 1 +
.../execution/load/LoadTsFileManagerTest.java | 60 +-
.../execution/load/MergedTsFileSplitterTest.java | 2 +-
.../db/queryengine/execution/load/TestBase.java | 83 +-
.../execution/load/TsFileSplitSenderTest.java | 72 +-
.../operator/AlignedSeriesScanOperatorTest.java | 2 +
.../operator/HorizontallyConcatOperatorTest.java | 3 +
.../execution/operator/LimitOperatorTest.java | 2 +
.../execution/operator/MergeSortOperatorTest.java | 173 ++-
.../execution/operator/OffsetOperatorTest.java | 4 +
.../execution/operator/OperatorMemoryTest.java | 5 +-
.../operator/SingleDeviceViewOperatorTest.java | 2 +
.../execution/operator/SortOperatorTest.java | 3 +
.../schema/SchemaQueryScanOperatorTest.java | 12 +-
.../queryengine/plan/analyze/AnalyzeFailTest.java | 38 +-
.../db/queryengine/plan/analyze/AnalyzeTest.java | 25 +-
.../plan/analyze/FakePartitionFetcherImpl.java | 6 +-
.../plan/analyze/cache/PartitionCacheTest.java | 33 +-
.../plan/parser/StatementGeneratorTest.java | 353 +++++-
.../plan/plan/QueryLogicalPlanUtil.java | 2 +-
.../distribution/DistributionPlannerBasicTest.java | 10 +-
.../plan/plan/distribution/LastQueryTest.java | 2 +-
.../queryengine/plan/plan/distribution/Util.java | 7 +-
.../read/DeviceSchemaScanNodeSerdeTest.java | 4 +-
.../NodeManagementMemoryMergeNodeSerdeTest.java | 6 +-
.../read/PathsUsingTemplateScanNodeTest.java | 6 +-
.../metadata/read/SchemaCountNodeSerdeTest.java | 12 +-
.../read/TimeSeriesSchemaScanNodeSerdeTest.java | 4 +-
.../plan/node/write/WritePlanNodeSplitTest.java | 30 +-
.../security/encrypt/MessageDigestEncryptTest.java | 2 +-
.../iotdb/db/storageengine/StorageEngineTest.java | 14 +
.../storageengine/dataregion/DataRegionTest.java | 58 +-
.../compaction/CompactionTaskComparatorTest.java | 121 +-
.../compaction/CompactionTaskManagerTest.java | 26 +-
...yControlTest.java => CompactionWorkerTest.java} | 78 +-
.../CrossSpaceCompactionWithUnusualCasesTest.java | 9 -
.../compaction/FastAlignedCrossCompactionTest.java | 47 -
.../FastCompactionPerformerWithEmptyPageTest.java | 2 -
...InconsistentCompressionTypeAndEncodingTest.java | 7 -
.../FastCrossCompactionPerformerTest.java | 17 +-
.../FastInnerCompactionPerformerTest.java | 121 +-
.../FastNonAlignedCrossCompactionTest.java | 47 -
.../compaction/ReadChunkInnerCompactionTest.java | 19 +-
.../ReadPointAlignedCrossCompactionTest.java | 47 -
.../ReadPointNonAlignedCrossCompactionTest.java | 47 -
.../cross/CrossSpaceCompactionSelectorTest.java | 92 +-
.../CrossSpaceCompactionWithFastPerformerTest.java | 4 -
...eCompactionWithFastPerformerValidationTest.java | 173 +--
...sSpaceCompactionWithReadPointPerformerTest.java | 4 -
...actionWithReadPointPerformerValidationTest.java | 160 +--
...eCrossSpaceCompactionWithFastPerformerTest.java | 31 +-
...sSpaceCompactionWithReadPointPerformerTest.java | 31 +-
.../inner/InnerCompactionEmptyTsFileTest.java | 2 -
.../InnerSeqCompactionWithFastPerformerTest.java | 20 +-
...nerSeqCompactionWithReadChunkPerformerTest.java | 13 +-
.../inner/InnerSpaceCompactionSelectorTest.java | 112 +-
.../SizeTieredCompactionRecoverTest.java | 2 -
...eCrossSpaceCompactionRecoverCompatibleTest.java | 4 +-
.../SizeTieredCompactionRecoverCompatibleTest.java | 4 +-
.../recover/SizeTieredCompactionRecoverTest.java | 3 +-
.../settle/SettleRequestHandlerTest.java | 2 +-
.../compaction/tools/ListTimeRangeImplTest.java | 138 +++
.../compaction/tools/UnseqSpaceStatisticsTest.java | 63 +
.../utils/CompactionFileGeneratorUtils.java | 23 +-
.../utils/CompactionUpdateFileCountTest.java | 109 ++
.../utils/MultiTsFileDeviceIteratorTest.java | 7 -
.../dataregion/flush/CompressionRatioTest.java | 108 +-
.../dataregion/memtable/MemTableFlushTaskTest.java | 4 +-
.../dataregion/memtable/MemtableBenchmark.java | 4 +-
.../dataregion/memtable/PrimitiveMemTableTest.java | 16 +-
.../wal/checkpoint/CheckpointManagerTest.java | 8 +-
.../dataregion/wal/io/CheckpointFileTest.java | 8 +-
.../dataregion/wal/node/WALEntryHandlerTest.java | 20 +-
.../dataregion/wal/node/WALNodeTest.java | 10 +-
.../wal/recover/WALRecoverManagerTest.java | 13 +-
.../wal/utils/WALInsertNodeCacheTest.java | 161 ++-
.../TsFileOverlapValidationAndRepairToolTest.java | 193 +++
.../apache/iotdb/db/utils/EnvironmentUtils.java | 18 +-
.../db/utils/TimestampPrecisionUtilsTest.java | 87 ++
iotdb-core/metrics/interface/pom.xml | 1 -
.../metrics/metricsets/system/SystemMetrics.java | 31 +-
.../apache/iotdb/metrics/utils/SystemMetric.java | 1 +
iotdb-core/mlnode/.gitignore | 3 +-
iotdb-core/mlnode/README.md | 4 +-
.../mlnode/iotdb/mlnode/algorithm/factory.py | 66 +
.../iotdb/mlnode/algorithm/hyperparameter.py | 407 +++++++
iotdb-core/mlnode/iotdb/mlnode/algorithm/metric.py | 80 ++
.../mlnode/algorithm/models/forecast/__init__.py} | 10 -
.../mlnode/algorithm/models/forecast/dlinear.py | 133 ++
.../mlnode/algorithm/models/forecast/nbeats.py | 131 ++
.../mlnode/iotdb/mlnode/algorithm/validator.py | 45 +
iotdb-core/mlnode/iotdb/mlnode/client.py | 111 +-
iotdb-core/mlnode/iotdb/mlnode/config.py | 36 +
iotdb-core/mlnode/iotdb/mlnode/constant.py | 68 +-
iotdb-core/mlnode/iotdb/mlnode/dataset/dataset.py | 74 ++
iotdb-core/mlnode/iotdb/mlnode/dataset/factory.py | 50 +
iotdb-core/mlnode/iotdb/mlnode/dataset/source.py | 80 ++
.../mlnode/dataset/utils/__init__.py} | 10 -
.../iotdb/mlnode/dataset/utils/time_features.py | 149 +++
iotdb-core/mlnode/iotdb/mlnode/exception.py | 39 +-
iotdb-core/mlnode/iotdb/mlnode/handler.py | 63 +-
iotdb-core/mlnode/iotdb/mlnode/parser.py | 99 ++
.../mlnode/process/__init__.py} | 10 -
iotdb-core/mlnode/iotdb/mlnode/process/manager.py | 124 ++
iotdb-core/mlnode/iotdb/mlnode/process/task.py | 265 ++++
iotdb-core/mlnode/iotdb/mlnode/process/trial.py | 224 ++++
iotdb-core/mlnode/iotdb/mlnode/serde.py | 147 ++-
iotdb-core/mlnode/iotdb/mlnode/service.py | 4 -
iotdb-core/mlnode/iotdb/mlnode/storage.py | 28 +-
iotdb-core/mlnode/iotdb/mlnode/util.py | 25 +-
iotdb-core/mlnode/pom.xml | 2 -
iotdb-core/mlnode/requirements.txt | 7 +-
iotdb-core/mlnode/resources/conf/iotdb-mlnode.toml | 14 +-
.../mlnode/test/test_create_forecast_model.py | 100 ++
iotdb-core/mlnode/test/test_model_storage.py | 17 +-
iotdb-core/mlnode/test/test_serde.py | 8 +-
iotdb-core/node-commons/pom.xml | 5 +-
.../resources/conf/iotdb-common.properties | 20 +-
.../commons/auth/authorizer/BasicAuthorizer.java | 153 +--
.../iotdb/commons/auth/authorizer/IAuthorizer.java | 28 +-
.../commons/auth/authorizer/OpenIdAuthorizer.java | 2 +-
.../iotdb/commons/auth/entity/PathPrivilege.java | 152 ++-
.../commons/auth/entity/PriPrivilegeType.java | 91 ++
.../iotdb/commons/auth/entity/PrivilegeType.java | 92 +-
.../org/apache/iotdb/commons/auth/entity/Role.java | 230 +++-
.../org/apache/iotdb/commons/auth/entity/User.java | 171 ++-
.../iotdb/commons/auth/role/BasicRoleManager.java | 174 ++-
.../iotdb/commons/auth/role/IRoleAccessor.java | 9 +-
.../iotdb/commons/auth/role/IRoleManager.java | 19 +-
.../commons/auth/role/LocalFileRoleAccessor.java | 147 ++-
.../commons/auth/role/LocalFileRoleManager.java | 7 +
.../iotdb/commons/auth/user/BasicUserManager.java | 232 ++--
.../iotdb/commons/auth/user/IUserAccessor.java | 3 +
.../iotdb/commons/auth/user/IUserManager.java | 13 +-
.../commons/auth/user/LocalFileUserAccessor.java | 290 ++++-
.../commons/auth/user/LocalFileUserManager.java | 6 +
.../iotdb/commons/client/ClientPoolFactory.java | 26 +
.../iotdb/commons/client/mlnode}/MLNodeClient.java | 144 ++-
.../mlnode/MLNodeClientManager.java} | 28 +-
.../iotdb/commons/client/mlnode/MLNodeInfo.java} | 11 +-
.../client/property/ThriftClientProperty.java | 3 +-
.../concurrent/ExceptionalCountDownLatch.java | 54 +
.../apache/iotdb/commons/conf/CommonConfig.java | 69 +-
.../iotdb/commons/conf/CommonDescriptor.java | 31 +-
.../commons/consensus/index/ProgressIndex.java | 4 +-
.../consensus/index/impl/HybridProgressIndex.java | 2 +-
.../consensus/index/impl/MinimumProgressIndex.java | 8 +-
.../exception/IllegalPrivilegeException.java | 30 +-
.../PipeRuntimeConnectorCriticalException.java | 6 +-
.../pipe/PipeRuntimeCriticalException.java | 5 -
.../pipe/PipeRuntimeNonCriticalException.java | 5 -
.../commons/model/ForecastModeInformation.java | 153 +++
.../iotdb/commons/model/ModelInformation.java | 235 ++--
...TrailInformation.java => TrialInformation.java} | 34 +-
.../iotdb/commons/partition/DataPartition.java | 17 +-
.../apache/iotdb/commons/path/PathPatternTree.java | 19 +-
.../apache/iotdb/commons/path/fa/FAFactory.java | 2 +-
.../iotdb/commons/pipe/config/PipeConfig.java | 21 +-
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 2 +
.../plugin/builtin/connector/OpcUaConnector.java} | 23 +-
.../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 26 +-
.../iotdb/commons/schema/SchemaConstant.java | 18 +-
.../service/metric/JvmGcMonitorMetrics.java | 35 +-
.../iotdb/commons/service/metric/enums/Metric.java | 3 +
.../udf/builtin/ModelInferenceFunction.java} | 36 +-
.../commons/udf/service/UDFManagementService.java | 19 +
.../org/apache/iotdb/commons/utils/AuthUtils.java | 258 ++--
.../org/apache/iotdb/commons/utils/IOUtils.java | 115 +-
.../iotdb/commons/utils/TimePartitionUtils.java | 83 ++
.../iotdb/commons/path/PathPatternTreeTest.java | 9 +-
.../iotdb/commons/pipe/PipeMetaDeSerTest.java | 2 +-
.../apache/iotdb/commons/utils/AuthUtilsTest.java | 203 ++++
iotdb-core/tsfile/pom.xml | 4 +-
.../tsfile/file/metadata/enums/TSDataType.java | 10 +
.../iotdb/tsfile/read/common/block/TsBlock.java | 20 +-
.../tsfile/read/common/block/TsBlockBuilder.java | 4 +-
.../tsfile/write/writer/LocalTsFileOutput.java | 6 +
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 7 +
.../iotdb/tsfile/write/writer/TsFileOutput.java | 3 +
.../tsfile/write/writer/TestTsFileOutput.java | 5 +
iotdb-protocol/openapi/pom.xml | 2 -
iotdb-protocol/thrift-commons/pom.xml | 1 -
.../thrift-commons/src/main/thrift/common.thrift | 5 +-
iotdb-protocol/thrift-confignode/pom.xml | 1 -
.../src/main/thrift/confignode.thrift | 123 +-
iotdb-protocol/thrift-consensus/pom.xml | 1 -
iotdb-protocol/thrift-datanode/pom.xml | 3 +-
.../src/main/thrift/datanode.thrift | 7 +-
iotdb-protocol/thrift-mlnode/pom.xml | 1 -
.../thrift-mlnode/src/main/thrift/mlnode.thrift | 12 +-
library-udf/pom.xml | 3 -
.../library/series/UDTFConsecutiveSequences.java | 23 +-
.../library/series/UDTFConsecutiveWindows.java | 23 +-
.../iotdb/library/series/util/ConsecutiveUtil.java | 23 +-
pom.xml | 235 +++-
835 files changed, 34774 insertions(+), 10077 deletions(-)
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DataPartitionBatchFetcher.java
index f969a2746a6,00000000000..eaa9f427ecb
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DataPartitionBatchFetcher.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DataPartitionBatchFetcher.java
@@@ -1,70 -1,0 +1,71 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DataPartitionBatchFetcher {
+
+ private final IPartitionFetcher fetcher;
+
+ public DataPartitionBatchFetcher(IPartitionFetcher fetcher) {
+ this.fetcher = fetcher;
+ }
+
+ public List<TRegionReplicaSet> queryDataPartition(
- List<Pair<String, TTimePartitionSlot>> slotList) {
++ List<Pair<String, TTimePartitionSlot>> slotList, String userName) {
+ List<TRegionReplicaSet> replicaSets = new ArrayList<>();
+ int size = slotList.size();
+
+ for (int i = 0; i < size; i += LoadTsFileScheduler.TRANSMIT_LIMIT) {
+ List<Pair<String, TTimePartitionSlot>> subSlotList =
+ slotList.subList(i, Math.min(size, i + LoadTsFileScheduler.TRANSMIT_LIMIT));
- DataPartition dataPartition = fetcher.getOrCreateDataPartition(toQueryParam(subSlotList));
++ DataPartition dataPartition =
++ fetcher.getOrCreateDataPartition(toQueryParam(subSlotList), userName);
+ replicaSets.addAll(
+ subSlotList.stream()
+ .map(pair -> dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right))
+ .collect(Collectors.toList()));
+ }
+ return replicaSets;
+ }
+
+ private List<DataPartitionQueryParam> toQueryParam(List<Pair<String, TTimePartitionSlot>> slots) {
+ return slots.stream()
+ .collect(
+ Collectors.groupingBy(
+ Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet())))
+ .entrySet()
+ .stream()
+ .map(
+ entry -> new DataPartitionQueryParam(entry.getKey(), new ArrayList<>(entry.getValue())))
+ .collect(Collectors.toList());
+ }
+}
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
index ed3f92ff925,00000000000..36448bd7aa1
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeviceBatchTsFileDataManager.java
@@@ -1,58 -1,0 +1,59 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+
+import java.io.File;
+
+/** Like TsFileDataManager, but one batch (TsFilePieceNode) belongs to the same device. */
+public class DeviceBatchTsFileDataManager extends TsFileDataManager {
+
+ private String currentDeviceId;
+
+ public DeviceBatchTsFileDataManager(
+ DispatchFunction dispatchFunction,
+ PlanNodeId planNodeId,
+ File targetFile,
+ DataPartitionBatchFetcher partitionBatchFetcher,
- long maxMemorySize) {
- super(dispatchFunction, planNodeId, targetFile, partitionBatchFetcher, maxMemorySize);
++ long maxMemorySize,
++ String userName) {
++ super(dispatchFunction, planNodeId, targetFile, partitionBatchFetcher, maxMemorySize, userName);
+ }
+
+ protected boolean addOrSendChunkData(ChunkData chunkData) {
+ if (currentDeviceId != null && !currentDeviceId.equals(chunkData.getDevice())) {
+ // a new device, flush previous data first
+ boolean flushSucceed = flushChunkData(true);
+ if (!flushSucceed) {
+ return false;
+ }
+ }
+ // add the chunk into the batch
+ currentDeviceId = chunkData.getDevice();
+ nonDirectionalChunkData.add(chunkData);
+ dataSize += chunkData.getDataSize();
+ if (dataSize > maxMemorySize) {
+ return flushChunkData(false);
+ }
+
+ return true;
+ }
+}
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 6b055c55db5,97542ceab24..5baf1de9314
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@@ -47,8 -45,8 +45,10 @@@ import java.nio.file.DirectoryNotEmptyE
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
++import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
++import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@@ -218,13 -214,8 +218,14 @@@ public class LoadTsFileManager
}
}
+ // method is synchronized because the chunks in a chunk group may be sent in parallel
@SuppressWarnings("squid:S3824")
- private synchronized void write(DataPartitionInfo partitionInfo, ChunkData chunkData) throws IOException {
- private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) throws IOException {
++ private synchronized void write(DataPartitionInfo partitionInfo, ChunkData chunkData)
++ throws IOException {
+ // ensure that retransmission will not result in writing duplicated data
+ if (receivedSplitIds.contains(chunkData.getSplitId())) {
+ return;
+ }
if (isClosed) {
throw new IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
index 2f9464c0830,00000000000..08cfc649d3b
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
@@@ -1,766 -1,0 +1,765 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
++import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
- import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+public class MergedTsFileSplitter {
+
+ private static final Logger logger = LoggerFactory.getLogger(TsFileSplitter.class);
+
+ private final List<File> tsFiles;
+ private final Function<TsFileData, Boolean> consumer;
+ private final PriorityQueue<SplitTask> taskPriorityQueue;
+ private final int maxConcurrentFileNum;
+ private ExecutorService asyncExecutor;
+ private long timePartitionInterval;
+ private AtomicInteger splitIdGenerator = new AtomicInteger();
+ private Statistic statistic = new Statistic();
+
+ public MergedTsFileSplitter(
+ List<File> tsFiles,
+ Function<TsFileData, Boolean> consumer,
+ ExecutorService asyncExecutor,
+ long timePartitionInterval,
+ int maxConcurrentFileNum) {
+ this.tsFiles = tsFiles;
+ this.consumer = consumer;
+ this.asyncExecutor = asyncExecutor;
+ this.timePartitionInterval = timePartitionInterval;
+ this.maxConcurrentFileNum = maxConcurrentFileNum;
+ taskPriorityQueue = new PriorityQueue<>();
+ }
+
+ public void splitTsFileByDataPartition() throws IOException, IllegalStateException {
+ long startTime = System.nanoTime();
+ int i = 0;
+ for (; i < tsFiles.size(); i++) {
+ // only allow at most maxConcurrentFileNum files to be merged at the same time
+ if (taskPriorityQueue.size() > maxConcurrentFileNum) {
+ break;
+ }
+
+ File tsFile = tsFiles.get(i);
+ SplitTask splitTask = new SplitTask(tsFile, asyncExecutor, i);
+ logger.info("Start to split {}", tsFiles.get(i));
+ if (splitTask.hasNext()) {
+ taskPriorityQueue.add(splitTask);
+ }
+ }
+ statistic.initTime = System.nanoTime() - startTime;
+
+ while (!taskPriorityQueue.isEmpty()) {
+ startTime = System.nanoTime();
+ SplitTask task = taskPriorityQueue.poll();
+ TsFileData tsFileData = task.removeNext();
+ statistic.fetchDataTime += System.nanoTime() - startTime;
+ tsFileData.setSplitId(splitIdGenerator.incrementAndGet());
+
+ startTime = System.nanoTime();
+ consumer.apply(tsFileData);
+ statistic.consumeTime += System.nanoTime() - startTime;
+
-
+ startTime = System.nanoTime();
+ if (task.hasNext()) {
+ taskPriorityQueue.add(task);
+ } else {
+ // when a file is exhausted, add the next non-empty file
+ for (; i < tsFiles.size(); i++) {
+ SplitTask splitTask = new SplitTask(tsFiles.get(i), asyncExecutor, i);
+ logger.info("Start to split {}", tsFiles.get(i));
+ if (splitTask.hasNext()) {
+ taskPriorityQueue.add(splitTask);
+ i++;
+ break;
+ }
+ }
+ }
+ statistic.enqueueTime += System.nanoTime() - startTime;
+ }
+ }
+
+ public void close() throws IOException {
- logger.info("Init/FetchData/Consume/Enqueue Time: {}/{}/{}/{}ms"
- , statistic.initTime / 1_000_000L
- , statistic.fetchDataTime / 1_000_000L
- , statistic.consumeTime / 1_000_000L
- , statistic.enqueueTime / 1_000_000L);
++ logger.info(
++ "Init/FetchData/Consume/Enqueue Time: {}/{}/{}/{}ms",
++ statistic.initTime / 1_000_000L,
++ statistic.fetchDataTime / 1_000_000L,
++ statistic.consumeTime / 1_000_000L,
++ statistic.enqueueTime / 1_000_000L);
+ for (SplitTask task : taskPriorityQueue) {
+ task.close();
+ }
+ taskPriorityQueue.clear();
+ logger.info("Splitter closed.");
+ }
+
+ private class Statistic {
+ private long initTime;
+ private long fetchDataTime;
+ private long consumeTime;
+ private long enqueueTime;
+ }
+
+ private class SplitTask implements Comparable<SplitTask> {
+
+ private final File tsFile;
+ private final int fileId;
+ private TsFileSequenceReader reader;
+ private final TreeMap<Long, List<Deletion>> offset2Deletions;
+
+ private String curDevice;
+ private boolean isTimeChunkNeedDecode;
+ private Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData;
+ private Map<Integer, long[]> pageIndex2Times;
+ private Map<Long, IChunkMetadata> offset2ChunkMetadata;
+
+ private BlockingQueue<TsFileData> nextSplits;
+ private TsFileData nextSplit;
+ private byte marker = -1;
+
+ private ExecutorService asyncExecutor;
+ private Future<Void> asyncTask;
+
+ public SplitTask(File tsFile, ExecutorService asyncExecutor, int fileId) throws IOException {
+ this.tsFile = tsFile;
+ this.fileId = fileId;
+ this.asyncExecutor = asyncExecutor;
+ offset2Deletions = new TreeMap<>();
+ init();
+ }
+
+ @Override
+ public int compareTo(SplitTask o) {
+ try {
+ TsFileData thisNext = showNext();
+ TsFileData thatNext = o.showNext();
+ // out put modification first
+ if (thisNext.isModification() && thatNext.isModification()) {
+ return 0;
+ }
+ if (thisNext.isModification()) {
+ return 1;
+ }
+ if (thatNext.isModification()) {
+ return -1;
+ }
+
+ ChunkData thisChunk = (ChunkData) thisNext;
+ ChunkData thatChunk = ((ChunkData) thatNext);
+ Comparator<ChunkData> chunkDataComparator =
+ Comparator.comparing(ChunkData::getDevice, String::compareTo)
+ .thenComparing(ChunkData::firstMeasurement, String::compareTo);
+ int chunkCompare = chunkDataComparator.compare(thisChunk, thatChunk);
+ if (chunkCompare != 0) {
+ return chunkCompare;
+ }
+ return Integer.compare(this.fileId, o.fileId);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void init() throws IOException {
+ this.reader = new TsFileSequenceReader(tsFile.getAbsolutePath());
+ getAllModification(offset2Deletions);
+
+ if (!checkMagic(reader)) {
+ throw new TsFileRuntimeException(
+ String.format("Magic String check error when parsing TsFile %s.", tsFile.getPath()));
+ }
+ reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+
+ curDevice = null;
+ isTimeChunkNeedDecode = true;
+ pageIndex2ChunkData = new HashMap<>();
+ pageIndex2Times = null;
+ offset2ChunkMetadata = new HashMap<>();
+ getChunkMetadata(reader, offset2ChunkMetadata);
+
+ nextSplits = new LinkedBlockingDeque<>(64);
+ if (asyncExecutor != null) {
+ asyncTask =
+ asyncExecutor.submit(
+ () -> {
+ try {
+ asyncComputeNext();
+ } catch (Throwable e) {
+ logger.info("Exception during splitting", e);
+ throw e;
+ }
+ return null;
+ });
+ }
+ }
+
+ private void asyncComputeNext() throws IOException {
+ while (reader != null && !Thread.interrupted()) {
+ computeNext();
+ }
+ logger.info("{} ends splitting", tsFile);
+ }
+
+ public boolean hasNext() throws IOException {
+ if (nextSplit != null && !(nextSplit instanceof EmptyTsFileData)) {
+ return true;
+ }
+ if (reader == null && nextSplits.isEmpty()) {
+ return false;
+ }
+
+ if (asyncExecutor == null) {
+ computeNext();
+ if (!nextSplits.isEmpty()) {
+ nextSplit = nextSplits.poll();
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ try {
+ nextSplit = nextSplits.take();
+ } catch (InterruptedException e) {
+ return false;
+ }
+ return !(nextSplit instanceof EmptyTsFileData);
+ }
+ }
+
+ public TsFileData removeNext() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ TsFileData split = nextSplit;
+ nextSplit = null;
+ return split;
+ }
+
+ public TsFileData showNext() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return nextSplit;
+ }
+
+ public void close() throws IOException {
+ if (asyncTask != null) {
+ asyncTask.cancel(true);
+ try {
+ asyncTask.get();
+ } catch (CancellationException ignored) {
+
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ }
+ }
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ }
+
+ private byte nextMarker() throws IOException {
+ if (marker != -1) {
+ // inherit the marker from the previous breakpoint
+ // e.g. the marker after processing a chunk
+ return marker;
+ }
+ return marker = reader.readMarker();
+ }
+
+ private void insertNewChunk(ChunkData chunkData) throws IOException {
+ if (asyncExecutor == null) {
+ nextSplits.add(chunkData);
+ } else {
+ try {
+ nextSplits.put(chunkData);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ @SuppressWarnings({"squid:S3776", "squid:S6541"})
+ private void computeNext() throws IOException, IllegalStateException {
+ if (reader == null) {
+ return;
+ }
+
+ while (nextMarker() != MetaMarker.SEPARATOR) {
+ switch (marker) {
+ case MetaMarker.CHUNK_HEADER:
+ case MetaMarker.TIME_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+ long chunkOffset = reader.position();
+ boolean chunkDataGenerated =
+ consumeAllAlignedChunkData(chunkOffset, pageIndex2ChunkData);
+ handleModification(offset2Deletions, chunkOffset);
+ if (chunkDataGenerated) {
+ return;
+ }
+
+ ChunkHeader header = reader.readChunkHeader(marker);
+ if (header.getDataSize() == 0) {
+ throw new TsFileRuntimeException(
+ String.format(
+ "Empty Nonaligned Chunk or Time Chunk with offset %d in TsFile %s.",
+ chunkOffset, tsFile.getPath()));
+ }
+
+ boolean isAligned =
+ ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+ == TsFileConstant.TIME_COLUMN_MASK);
+ IChunkMetadata chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
+ TTimePartitionSlot timePartitionSlot =
- TimePartitionUtils.getTimePartition(
++ TimePartitionUtils.getTimePartitionSlot(
+ chunkMetadata.getStartTime(), timePartitionInterval);
+ ChunkData chunkData =
+ ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot);
+
+ if (!needDecodeChunk(chunkMetadata)) {
+ chunkData.setNotDecode();
+ chunkData.writeEntireChunk(reader.readChunk(-1, header.getDataSize()), chunkMetadata);
+ if (isAligned) {
+ isTimeChunkNeedDecode = false;
+ pageIndex2ChunkData
+ .computeIfAbsent(1, o -> new ArrayList<>())
+ .add((AlignedChunkData) chunkData);
+ } else {
+ insertNewChunk(chunkData);
+ }
+ break;
+ }
+
+ Decoder defaultTimeDecoder =
+ Decoder.getDecoderByType(
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
+ Decoder valueDecoder =
+ Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
+ int dataSize = header.getDataSize();
+ int pageIndex = 0;
+ if (isAligned) {
+ isTimeChunkNeedDecode = true;
+ pageIndex2Times = new HashMap<>();
+ }
+
+ while (dataSize > 0) {
+ PageHeader pageHeader =
+ reader.readPageHeader(
+ header.getDataType(),
+ (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
+ long pageDataSize = pageHeader.getSerializedPageSize();
+ if (!needDecodePage(pageHeader, chunkMetadata)) { // an entire page
+ long startTime =
+ pageHeader.getStatistics() == null
+ ? chunkMetadata.getStartTime()
+ : pageHeader.getStartTime();
+ TTimePartitionSlot pageTimePartitionSlot =
- TimePartitionUtils.getTimePartition(startTime, timePartitionInterval);
++ TimePartitionUtils.getTimePartitionSlot(startTime, timePartitionInterval);
+ if (!timePartitionSlot.equals(pageTimePartitionSlot)) {
+ if (!isAligned) {
+ insertNewChunk(chunkData);
+ }
+ timePartitionSlot = pageTimePartitionSlot;
+ chunkData =
+ ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot);
+ }
+ if (isAligned) {
+ pageIndex2ChunkData
+ .computeIfAbsent(pageIndex, o -> new ArrayList<>())
+ .add((AlignedChunkData) chunkData);
+ }
+ chunkData.writeEntirePage(pageHeader, reader.readCompressedPage(pageHeader));
+ } else { // split page
+ ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+ Pair<long[], Object[]> tvArray =
+ decodePage(
+ isAligned, pageData, pageHeader, defaultTimeDecoder, valueDecoder, header);
+ long[] times = tvArray.left;
+ Object[] values = tvArray.right;
+ if (isAligned) {
+ pageIndex2Times.put(pageIndex, times);
+ }
+
+ int start = 0;
+ long endTime = timePartitionSlot.getStartTime() + timePartitionInterval;
+ for (int i = 0; i < times.length; i++) {
+ if (times[i] >= endTime) {
+ chunkData.writeDecodePage(times, values, start, i);
+ if (isAligned) {
+ pageIndex2ChunkData
+ .computeIfAbsent(pageIndex, o -> new ArrayList<>())
+ .add((AlignedChunkData) chunkData);
+ } else {
+ insertNewChunk(chunkData);
+ }
+
+ timePartitionSlot =
- TimePartitionUtils.getTimePartition(times[i], timePartitionInterval);
++ TimePartitionUtils.getTimePartitionSlot(times[i], timePartitionInterval);
+ endTime = timePartitionSlot.getStartTime() + timePartitionInterval;
+ chunkData =
+ ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot);
+ start = i;
+ }
+ }
+ chunkData.writeDecodePage(times, values, start, times.length);
+ if (isAligned) {
+ pageIndex2ChunkData
+ .computeIfAbsent(pageIndex, o -> new ArrayList<>())
+ .add((AlignedChunkData) chunkData);
+ }
+ }
+
+ pageIndex += 1;
+ dataSize -= pageDataSize;
+ }
+
+ if (!isAligned) {
+ insertNewChunk(chunkData);
+ }
+ break;
+ case MetaMarker.VALUE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+ chunkOffset = reader.position();
+ chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
+ header = reader.readChunkHeader(marker);
+ if (header.getDataSize() == 0) {
+ handleEmptyValueChunk(
+ header, pageIndex2ChunkData, chunkMetadata, isTimeChunkNeedDecode);
+ break;
+ }
+
+ if (!isTimeChunkNeedDecode) {
+ AlignedChunkData alignedChunkData = pageIndex2ChunkData.get(1).get(0);
+ alignedChunkData.addValueChunk(header);
+ alignedChunkData.writeEntireChunk(
+ reader.readChunk(-1, header.getDataSize()), chunkMetadata);
+ break;
+ }
+
+ Set<ChunkData> allChunkData = new HashSet<>();
+ dataSize = header.getDataSize();
+ pageIndex = 0;
+ valueDecoder = Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
+
+ while (dataSize > 0) {
+ PageHeader pageHeader =
+ reader.readPageHeader(
+ header.getDataType(),
+ (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
+ List<AlignedChunkData> alignedChunkDataList = pageIndex2ChunkData.get(pageIndex);
+ for (AlignedChunkData alignedChunkData : alignedChunkDataList) {
+ if (!allChunkData.contains(alignedChunkData)) {
+ alignedChunkData.addValueChunk(header);
+ allChunkData.add(alignedChunkData);
+ }
+ }
+ if (alignedChunkDataList.size() == 1) { // write entire page
+ // write the entire page if it's not an empty page.
+ alignedChunkDataList
+ .get(0)
+ .writeEntirePage(pageHeader, reader.readCompressedPage(pageHeader));
+ } else { // decode page
+ long[] times = pageIndex2Times.get(pageIndex);
+ TsPrimitiveType[] values =
+ decodeValuePage(reader, header, pageHeader, times, valueDecoder);
+ for (AlignedChunkData alignedChunkData : alignedChunkDataList) {
+ alignedChunkData.writeDecodeValuePage(times, values, header.getDataType());
+ }
+ }
+ long pageDataSize = pageHeader.getSerializedPageSize();
+ pageIndex += 1;
+ dataSize -= pageDataSize;
+ }
+ break;
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+ curDevice = chunkGroupHeader.getDeviceID();
+ break;
+ case MetaMarker.OPERATION_INDEX_RANGE:
+ reader.readPlanIndex();
+ break;
+ default:
+ MetaMarker.handleUnexpectedMarker(marker);
+ }
+ marker = -1;
+ if (!nextSplits.isEmpty()) {
+ return;
+ }
+ }
+
+ consumeAllAlignedChunkData(reader.position(), pageIndex2ChunkData);
+ handleModification(offset2Deletions, Long.MAX_VALUE);
+ close();
+ if (asyncExecutor != null) {
+ try {
+ nextSplits.put(new EmptyTsFileData());
+ } catch (InterruptedException e) {
+ asyncTask.cancel(true);
+ }
+ }
+ }
+
+ private class EmptyTsFileData implements TsFileData {
+
+ @Override
+ public long getDataSize() {
+ return 0;
+ }
+
+ @Override
- public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
- }
++ public void writeToFileWriter(TsFileIOWriter writer) throws IOException {}
+
+ @Override
+ public boolean isModification() {
+ return false;
+ }
+
+ @Override
- public void serialize(DataOutputStream stream) throws IOException {
- }
++ public void serialize(DataOutputStream stream) throws IOException {}
+
+ @Override
+ public int getSplitId() {
+ return 0;
+ }
+
+ @Override
- public void setSplitId(int sid) {
- }
++ public void setSplitId(int sid) {}
+ }
+
+ private void getAllModification(Map<Long, List<Deletion>> offset2Deletions) throws IOException {
+ try (ModificationFile modificationFile =
+ new ModificationFile(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX)) {
+ for (Modification modification : modificationFile.getModifications()) {
+ offset2Deletions
+ .computeIfAbsent(modification.getFileOffset(), o -> new ArrayList<>())
+ .add((Deletion) modification);
+ }
+ }
+ }
+
+ private boolean checkMagic(TsFileSequenceReader reader) throws IOException {
+ String magic = reader.readHeadMagic();
+ if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
+ logger.error("the file's MAGIC STRING is incorrect, file path: {}", reader.getFileName());
+ return false;
+ }
+
+ byte versionNumber = reader.readVersionNumber();
+ if (versionNumber != TSFileConfig.VERSION_NUMBER) {
+ logger.error("the file's Version Number is incorrect, file path: {}", reader.getFileName());
+ return false;
+ }
+
+ if (!reader.readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
+ logger.error("the file is not closed correctly, file path: {}", reader.getFileName());
+ return false;
+ }
+ return true;
+ }
+
+ private void getChunkMetadata(
+ TsFileSequenceReader reader, Map<Long, IChunkMetadata> offset2ChunkMetadata)
+ throws IOException {
+ Map<String, List<TimeseriesMetadata>> device2Metadata = reader.getAllTimeseriesMetadata(true);
+ for (Map.Entry<String, List<TimeseriesMetadata>> entry : device2Metadata.entrySet()) {
+ for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
+ for (IChunkMetadata chunkMetadata : timeseriesMetadata.getChunkMetadataList()) {
+ offset2ChunkMetadata.put(chunkMetadata.getOffsetOfChunkHeader(), chunkMetadata);
+ }
+ }
+ }
+ }
+
+ private void handleModification(
+ TreeMap<Long, List<Deletion>> offset2Deletions, long chunkOffset) {
+ while (!offset2Deletions.isEmpty() && offset2Deletions.firstEntry().getKey() <= chunkOffset) {
+ offset2Deletions
+ .pollFirstEntry()
+ .getValue()
+ .forEach(o -> nextSplits.add(new DeletionData(o)));
+ }
+ }
+
+ private boolean consumeAllAlignedChunkData(
+ long offset, Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData) {
+ if (pageIndex2ChunkData.isEmpty()) {
+ return false;
+ }
+
+ Set<ChunkData> allChunkData = new HashSet<>();
+ for (Map.Entry<Integer, List<AlignedChunkData>> entry : pageIndex2ChunkData.entrySet()) {
+ allChunkData.addAll(entry.getValue());
+ }
+ for (ChunkData chunkData : allChunkData) {
+ try {
+ nextSplits.put(chunkData);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+ pageIndex2ChunkData.clear();
+ return true;
+ }
+
+ private boolean needDecodeChunk(IChunkMetadata chunkMetadata) {
- return !TimePartitionUtils.getTimePartition(
++ return !TimePartitionUtils.getTimePartitionSlot(
+ chunkMetadata.getStartTime(), timePartitionInterval)
+ .equals(
- TimePartitionUtils.getTimePartition(
++ TimePartitionUtils.getTimePartitionSlot(
+ chunkMetadata.getEndTime(), timePartitionInterval));
+ }
+
+ private boolean needDecodePage(PageHeader pageHeader, IChunkMetadata chunkMetadata) {
+ if (pageHeader.getStatistics() == null) {
- return !TimePartitionUtils.getTimePartition(
++ return !TimePartitionUtils.getTimePartitionSlot(
+ chunkMetadata.getStartTime(), timePartitionInterval)
+ .equals(
- TimePartitionUtils.getTimePartition(
++ TimePartitionUtils.getTimePartitionSlot(
+ chunkMetadata.getEndTime(), timePartitionInterval));
+ }
- return !TimePartitionUtils.getTimePartition(pageHeader.getStartTime(), timePartitionInterval)
++ return !TimePartitionUtils.getTimePartitionSlot(
++ pageHeader.getStartTime(), timePartitionInterval)
+ .equals(
- TimePartitionUtils.getTimePartition(pageHeader.getEndTime(), timePartitionInterval));
++ TimePartitionUtils.getTimePartitionSlot(
++ pageHeader.getEndTime(), timePartitionInterval));
+ }
+
+ private Pair<long[], Object[]> decodePage(
+ boolean isAligned,
+ ByteBuffer pageData,
+ PageHeader pageHeader,
+ Decoder timeDecoder,
+ Decoder valueDecoder,
+ ChunkHeader chunkHeader)
+ throws IOException {
+ if (isAligned) {
+ TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, timeDecoder);
+ long[] times = timePageReader.getNextTimeBatch();
+ return new Pair<>(times, new Object[times.length]);
+ }
+
+ valueDecoder.reset();
+ PageReader pageReader =
+ new PageReader(pageData, chunkHeader.getDataType(), valueDecoder, timeDecoder, null);
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ long[] times = new long[batchData.length()];
+ Object[] values = new Object[batchData.length()];
+ int index = 0;
+ while (batchData.hasCurrent()) {
+ times[index] = batchData.currentTime();
+ values[index++] = batchData.currentValue();
+ batchData.next();
+ }
+ return new Pair<>(times, values);
+ }
+
+ private void handleEmptyValueChunk(
+ ChunkHeader header,
+ Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData,
+ IChunkMetadata chunkMetadata,
+ boolean isTimeChunkNeedDecode)
+ throws IOException {
+ Set<ChunkData> allChunkData = new HashSet<>();
+ for (Map.Entry<Integer, List<AlignedChunkData>> entry : pageIndex2ChunkData.entrySet()) {
+ for (AlignedChunkData alignedChunkData : entry.getValue()) {
+ if (!allChunkData.contains(alignedChunkData)) {
+ alignedChunkData.addValueChunk(header);
+ if (!isTimeChunkNeedDecode) {
+ alignedChunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata);
+ }
+ allChunkData.add(alignedChunkData);
+ }
+ }
+ }
+ }
+
+ private TsPrimitiveType[] decodeValuePage(
+ TsFileSequenceReader reader,
+ ChunkHeader chunkHeader,
+ PageHeader pageHeader,
+ long[] times,
+ Decoder valueDecoder)
+ throws IOException {
+ if (pageHeader.getSerializedPageSize() == 0) {
+ return new TsPrimitiveType[times.length];
+ }
+
+ valueDecoder.reset();
+ ByteBuffer pageData = reader.readPage(pageHeader, chunkHeader.getCompressionType());
+ ValuePageReader valuePageReader =
+ new ValuePageReader(pageHeader, pageData, chunkHeader.getDataType(), valueDecoder);
+ return valuePageReader.nextValueBatch(
+ times); // should be origin time, so recording satisfied length is necessary
+ }
+ }
+}
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
index bf1c79172b3,00000000000..fa5aa285ed7
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileDataManager.java
@@@ -1,169 -1,0 +1,173 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * TsFileDataManager batches splits generated by TsFileSplitter as LoadTsFilePieceNode, route the
+ * splits to associated replica set, and sends them to the replicas with the provided dispatching
+ * function.
+ */
+@SuppressWarnings("BooleanMethodIsAlwaysInverted")
+public class TsFileDataManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(TsFileDataManager.class);
+ protected final long maxMemorySize;
+ protected final DispatchFunction dispatchFunction;
+ private final DataPartitionBatchFetcher partitionBatchFetcher;
+ protected final PlanNodeId planNodeId;
+ protected final File targetFile;
+
+ protected long dataSize;
+ protected final Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
+ protected final List<ChunkData> nonDirectionalChunkData;
++ protected final String userName;
+
+ @FunctionalInterface
+ public interface DispatchFunction {
+
+ boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode, TRegionReplicaSet replicaSet);
+ }
+
+ public TsFileDataManager(
+ DispatchFunction dispatchFunction,
+ PlanNodeId planNodeId,
+ File targetFile,
+ DataPartitionBatchFetcher partitionBatchFetcher,
- long maxMemorySize) {
++ long maxMemorySize,
++ String userName) {
+ this.dispatchFunction = dispatchFunction;
+ this.planNodeId = planNodeId;
+ this.targetFile = targetFile;
+ this.dataSize = 0;
+ this.replicaSet2Piece = new HashMap<>();
+ this.nonDirectionalChunkData = new ArrayList<>();
+ this.partitionBatchFetcher = partitionBatchFetcher;
+ this.maxMemorySize = maxMemorySize;
++ this.userName = userName;
+ }
+
+ public boolean addOrSendTsFileData(TsFileData tsFileData) {
+ return tsFileData.isModification()
+ ? addOrSendDeletionData(tsFileData)
+ : addOrSendChunkData((ChunkData) tsFileData);
+ }
+
+ protected boolean addOrSendChunkData(ChunkData chunkData) {
+ nonDirectionalChunkData.add(chunkData);
+ dataSize += chunkData.getDataSize();
+
+ if (dataSize > maxMemorySize) {
+ return flushChunkData(false);
+ }
+
+ return true;
+ }
+
+ protected boolean flushChunkData(boolean flushAll) {
+ routeChunkData();
+
+ // start to dispatch from the biggest TsFilePieceNode
+ List<TRegionReplicaSet> sortedReplicaSets =
+ replicaSet2Piece.keySet().stream()
+ .sorted(Comparator.comparingLong(o -> replicaSet2Piece.get(o).getDataSize()).reversed())
+ .collect(Collectors.toList());
+
+ for (TRegionReplicaSet sortedReplicaSet : sortedReplicaSets) {
+ LoadTsFilePieceNode pieceNode = replicaSet2Piece.get(sortedReplicaSet);
+ if (pieceNode.getDataSize() == 0) { // total data size has been reduced to 0
+ break;
+ }
+ if (!dispatchFunction.dispatchOnePieceNode(pieceNode, sortedReplicaSet)) {
+ return false;
+ }
+
+ dataSize -= pieceNode.getDataSize();
+ replicaSet2Piece.put(
+ sortedReplicaSet,
+ new LoadTsFilePieceNode(
+ planNodeId, targetFile)); // can not just remove, because of deletion
+ if (!flushAll && dataSize <= maxMemorySize) {
+ break;
+ }
+ }
+ return true;
+ }
+
+ protected void routeChunkData() {
+ if (nonDirectionalChunkData.isEmpty()) {
+ return;
+ }
+
+ List<TRegionReplicaSet> replicaSets =
+ partitionBatchFetcher.queryDataPartition(
+ nonDirectionalChunkData.stream()
+ .map(data -> new Pair<>(data.getDevice(), data.getTimePartitionSlot()))
- .collect(Collectors.toList()));
++ .collect(Collectors.toList()),
++ userName);
+ IntStream.range(0, nonDirectionalChunkData.size())
+ .forEach(
+ i ->
+ replicaSet2Piece
+ .computeIfAbsent(
+ replicaSets.get(i), o -> new LoadTsFilePieceNode(planNodeId, targetFile))
+ .addTsFileData(nonDirectionalChunkData.get(i)));
+ nonDirectionalChunkData.clear();
+ }
+
+ private boolean addOrSendDeletionData(TsFileData deletionData) {
+ routeChunkData(); // ensure chunk data will be added before deletion
+
+ for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) {
+ dataSize += deletionData.getDataSize();
+ entry.getValue().addTsFileData(deletionData);
+ }
+ return true;
+ }
+
+ public boolean sendAllTsFileData() {
+ routeChunkData();
+
+ for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) {
+ if (entry.getValue().getDataSize() > 0
+ && !dispatchFunction.dispatchOnePieceNode(entry.getValue(), entry.getKey())) {
+ logger.warn("Dispatch piece node {} of TsFile {} error.", entry.getValue(), targetFile);
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
index 2f09a003106,00000000000..d6f0ac87e79
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
@@@ -1,464 -1,0 +1,474 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
- import java.util.ArrayDeque;
- import java.util.Queue;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
+import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import org.apache.iotdb.db.queryengine.execution.load.locseq.LocationSequencer;
+import org.apache.iotdb.db.queryengine.execution.load.locseq.LocationStatistics;
+import org.apache.iotdb.db.queryengine.execution.load.locseq.ThroughputBasedLocationSequencer;
+import org.apache.iotdb.db.queryengine.execution.load.nodesplit.ClusteringMeasurementSplitter;
- import org.apache.iotdb.db.queryengine.execution.load.nodesplit.OrderedMeasurementSplitter;
+import org.apache.iotdb.db.queryengine.execution.load.nodesplit.PieceNodeSplitter;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
+import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
++import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
++import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
++import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileDispatcherImpl.NODE_CONNECTION_ERROR;
+
+public class TsFileSplitSender {
+
+ private static final int MAX_RETRY = 5;
+ private static final long RETRY_INTERVAL_MS = 6_000L;
+ private static final int MAX_PENDING_PIECE_NODE = 5;
+ private static final Logger logger = LoggerFactory.getLogger(TsFileSplitSender.class);
+
+ private LoadTsFileNode loadTsFileNode;
+ private DataPartitionBatchFetcher targetPartitionFetcher;
+ private long targetPartitionInterval;
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ internalServiceClientManager;
+ // All consensus groups accessed in Phase1 should be notified in Phase2
+ private final Set<TRegionReplicaSet> allReplicaSets = new ConcurrentSkipListSet<>();
+ private String uuid;
+ private LocationStatistics locationStatistics = new LocationStatistics();
+ private boolean isGeneratedByPipe;
+ private Map<Pair<LoadTsFilePieceNode, TRegionReplicaSet>, Exception> phaseOneFailures =
+ new ConcurrentHashMap<>();
+ private Map<TRegionReplicaSet, Exception> phaseTwoFailures = new HashMap<>();
+ private long maxSplitSize;
+ private PieceNodeSplitter pieceNodeSplitter = new ClusteringMeasurementSplitter(1.0, 10);
- // private PieceNodeSplitter pieceNodeSplitter = new OrderedMeasurementSplitter();
++ // private PieceNodeSplitter pieceNodeSplitter = new OrderedMeasurementSplitter();
+ private CompressionType compressionType = CompressionType.LZ4;
+ private Statistic statistic = new Statistic();
+ private ExecutorService splitNodeService;
+ private Queue<Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet>> splitFutures;
+ private int maxConcurrentFileNum;
++ private String userName;
+
+ public TsFileSplitSender(
+ LoadTsFileNode loadTsFileNode,
+ DataPartitionBatchFetcher targetPartitionFetcher,
+ long targetPartitionInterval,
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager,
+ boolean isGeneratedByPipe,
+ long maxSplitSize,
- int maxConcurrentFileNum) {
++ int maxConcurrentFileNum,
++ String userName) {
+ this.loadTsFileNode = loadTsFileNode;
+ this.targetPartitionFetcher = targetPartitionFetcher;
+ this.targetPartitionInterval = targetPartitionInterval;
+ this.internalServiceClientManager = internalServiceClientManager;
+ this.isGeneratedByPipe = isGeneratedByPipe;
+ this.maxSplitSize = maxSplitSize;
+ this.splitNodeService = IoTDBThreadPoolFactory.newCachedThreadPool("SplitLoadTsFilePieceNode");
+ this.splitFutures = new ArrayDeque<>(MAX_PENDING_PIECE_NODE);
+ this.maxConcurrentFileNum = maxConcurrentFileNum;
++ this.userName = userName;
+ }
+
+ public void start() throws IOException {
+ statistic.taskStartTime = System.currentTimeMillis();
+ // skip files without data
+ loadTsFileNode.getResources().removeIf(f -> f.getDevices().isEmpty());
+ uuid = UUID.randomUUID().toString();
+
+ boolean isFirstPhaseSuccess = firstPhase(loadTsFileNode);
+ boolean isSecondPhaseSuccess = secondPhase(isFirstPhaseSuccess);
+ if (isFirstPhaseSuccess && isSecondPhaseSuccess) {
+ logger.info("Load TsFiles {} Successfully", loadTsFileNode.getResources());
+ } else {
+ logger.warn("Can not Load TsFiles {}", loadTsFileNode.getResources());
+ }
+ statistic.taskEndTime = System.currentTimeMillis();
+ locationStatistics.logLocationStatistics();
+ statistic.logStatistic();
+ }
+
+ private boolean firstPhase(LoadTsFileNode node) throws IOException {
+ long start = System.currentTimeMillis();
+ TsFileDataManager tsFileDataManager =
+ new DeviceBatchTsFileDataManager(
+ this::dispatchOnePieceNode,
+ node.getPlanNodeId(),
+ node.lastResource().getTsFile(),
+ targetPartitionFetcher,
- maxSplitSize);
++ maxSplitSize,
++ userName);
+
+ ExecutorService executorService =
+ IoTDBThreadPoolFactory.newThreadPool(
+ 32,
+ Integer.MAX_VALUE,
+ 20,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new IoTThreadFactory("MergedTsFileSplitter"),
+ "MergedTsFileSplitter");
+ MergedTsFileSplitter splitter =
+ new MergedTsFileSplitter(
+ node.getResources().stream()
+ .map(TsFileResource::getTsFile)
+ .collect(Collectors.toList()),
+ tsFileDataManager::addOrSendTsFileData,
+ executorService,
+ targetPartitionInterval,
+ maxConcurrentFileNum);
+ splitter.splitTsFileByDataPartition();
+ splitter.close();
+ logger.info("Split ends after {}ms", System.currentTimeMillis() - start);
- boolean success = tsFileDataManager.sendAllTsFileData() && processRemainingPieceNodes()
- && phaseOneFailures.isEmpty();
++ boolean success =
++ tsFileDataManager.sendAllTsFileData()
++ && processRemainingPieceNodes()
++ && phaseOneFailures.isEmpty();
+ logger.info("Cleanup ends after {}ms", System.currentTimeMillis() - start);
+ return success;
+ }
+
+ private boolean secondPhase(boolean isFirstPhaseSuccess) {
+ logger.info("Start dispatching Load command for uuid {}", uuid);
+ TLoadCommandReq loadCommandReq =
+ new TLoadCommandReq(
+ (isFirstPhaseSuccess ? LoadCommand.EXECUTE : LoadCommand.ROLLBACK).ordinal(), uuid);
+ loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
+
+ for (TRegionReplicaSet replicaSet : allReplicaSets) {
+ loadCommandReq.setUseConsensus(true);
+ for (TDataNodeLocation dataNodeLocation : replicaSet.getDataNodeLocations()) {
+ TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
+ Exception groupException = null;
+ loadCommandReq.setConsensusGroupId(replicaSet.getRegionId());
+
+ for (int i = 0; i < MAX_RETRY; i++) {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
+ if (!loadResp.isAccepted()) {
+ logger.warn(loadResp.message);
+ groupException = new FragmentInstanceDispatchException(loadResp.status);
+ }
+ break;
+ } catch (ClientManagerException | TException e) {
+ logger.warn(NODE_CONNECTION_ERROR, endPoint, e);
+ TSStatus status = new TSStatus();
+ status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+ status.setMessage(
+ "can't connect to node {}, please reset longer dn_connection_timeout_ms "
+ + "in iotdb-common.properties and restart iotdb."
+ + endPoint);
+ groupException = new FragmentInstanceDispatchException(status);
+ }
+ try {
+ Thread.sleep(RETRY_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ groupException = e;
+ break;
+ }
+ }
+
+ if (groupException != null) {
+ phaseTwoFailures.put(replicaSet, groupException);
+ } else {
+ break;
+ }
+ }
+ }
+
+ return phaseTwoFailures.isEmpty();
+ }
+
+ public LocationSequencer createLocationSequencer(TRegionReplicaSet replicaSet) {
+ // return new FixedLocationSequencer(replicaSet);
+ // return new RandomLocationSequencer(replicaSet);
+ return new ThroughputBasedLocationSequencer(replicaSet, locationStatistics);
+ }
+
+ private ByteBuffer compressBuffer(ByteBuffer buffer) throws IOException {
+ statistic.rawSize.addAndGet(buffer.remaining());
+ if (compressionType.equals(CompressionType.UNCOMPRESSED)) {
+ statistic.compressedSize.addAndGet(buffer.remaining());
+ return buffer;
+ }
+ ICompressor compressor = ICompressor.getCompressor(compressionType);
+ int maxBytesForCompression = compressor.getMaxBytesForCompression(buffer.remaining()) + 1;
+ ByteBuffer compressed = ByteBuffer.allocate(maxBytesForCompression);
- int compressLength = compressor.compress(buffer.array(),
- buffer.arrayOffset() + buffer.position(), buffer.remaining(), compressed.array());
++ int compressLength =
++ compressor.compress(
++ buffer.array(),
++ buffer.arrayOffset() + buffer.position(),
++ buffer.remaining(),
++ compressed.array());
+ compressed.limit(compressLength);
+ statistic.compressedSize.addAndGet(compressLength);
+ return compressed;
+ }
+
+ private Future<List<LoadTsFilePieceNode>> submitSplitPieceNode(LoadTsFilePieceNode pieceNode) {
+ return splitNodeService.submit(() -> pieceNodeSplitter.split(pieceNode));
+ }
+
+ private boolean processRemainingPieceNodes() {
+ List<LoadTsFilePieceNode> subNodes;
+ for (Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet> pair : splitFutures) {
+ try {
+ subNodes = pair.left.get();
+ } catch (InterruptedException | ExecutionException e) {
+ logger.error("Unexpected error during splitting node", e);
+ return false;
+ }
+ if (!dispatchPieceNodes(subNodes, pair.right)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
- private boolean dispatchPieceNodes(List<LoadTsFilePieceNode> subNodes,
- TRegionReplicaSet replicaSet) {
++ private boolean dispatchPieceNodes(
++ List<LoadTsFilePieceNode> subNodes, TRegionReplicaSet replicaSet) {
+ AtomicLong minDispatchTime = new AtomicLong(Long.MAX_VALUE);
+ AtomicLong maxDispatchTime = new AtomicLong(Long.MIN_VALUE);
+ AtomicLong sumDispatchTime = new AtomicLong();
+ AtomicLong sumCompressingTime = new AtomicLong();
+
+ long start = System.nanoTime();
+ List<Boolean> subNodeResults =
+ subNodes.stream()
+ .parallel()
+ .map(
+ node -> {
+ ByteBuffer buffer;
+ long startTime = System.nanoTime();
+ int uncompressedLength;
+ try {
+ buffer = node.serializeToByteBuffer();
+ uncompressedLength = buffer.remaining();
+ buffer = compressBuffer(buffer);
+ } catch (IOException e) {
+ phaseOneFailures.put(new Pair<>(node, replicaSet), e);
+ return false;
+ }
+ long compressingTime = System.nanoTime() - startTime;
+ sumCompressingTime.addAndGet(compressingTime);
+ statistic.compressingTime.addAndGet(compressingTime);
+
+ TTsFilePieceReq loadTsFileReq =
+ new TTsFilePieceReq(buffer, uuid, replicaSet.getRegionId());
+ loadTsFileReq.isRelay = true;
+ loadTsFileReq.setCompressionType(compressionType.serialize());
+ loadTsFileReq.setUncompressedLength(uncompressedLength);
+ LocationSequencer locationSequencer = createLocationSequencer(replicaSet);
+
+ boolean loadSucceed = false;
+ Exception lastConnectionError = null;
+ TDataNodeLocation currLocation = null;
+ for (TDataNodeLocation location : locationSequencer) {
+ if (location.getDataNodeId() == 0 && logger.isDebugEnabled()) {
+ locationStatistics.logLocationStatistics();
+ logger.info("Chose location {}", location.getDataNodeId());
+ }
+ currLocation = location;
+ startTime = System.nanoTime();
+ for (int i = 0; i < MAX_RETRY; i++) {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(
+ currLocation.internalEndPoint)) {
+ TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
+ logger.debug("Response from {}: {}", location.getDataNodeId(), loadResp);
+ if (!loadResp.isAccepted()) {
+ logger.warn(loadResp.message);
+ phaseOneFailures.put(
+ new Pair<>(node, replicaSet),
+ new FragmentInstanceDispatchException(loadResp.status));
+ return false;
+ }
+ loadSucceed = true;
+ break;
+ } catch (ClientManagerException | TException e) {
+ lastConnectionError = e;
+ }
+
+ try {
+ Thread.sleep(RETRY_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ if (loadSucceed) {
+ break;
+ }
+ }
+
+ if (!loadSucceed) {
+ String warning = NODE_CONNECTION_ERROR;
+ logger.warn(warning, currLocation, lastConnectionError);
+ TSStatus status = new TSStatus();
+ status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+ status.setMessage(warning + currLocation);
+ phaseOneFailures.put(
+ new Pair<>(node, replicaSet),
+ new FragmentInstanceDispatchException(status));
+ return false;
+ }
+ long timeConsumption = System.nanoTime() - startTime;
+ logger.debug("Time consumption: {}", timeConsumption);
+ locationStatistics.updateThroughput(
+ currLocation, node.getDataSize(), timeConsumption);
+
+ synchronized (maxDispatchTime) {
+ if (maxDispatchTime.get() < timeConsumption) {
+ maxDispatchTime.set(timeConsumption);
+ }
+ }
+ synchronized (minDispatchTime) {
+ if (minDispatchTime.get() > timeConsumption) {
+ minDispatchTime.set(timeConsumption);
+ }
+ }
+ sumDispatchTime.addAndGet(timeConsumption);
+ return true;
+ })
+ .collect(Collectors.toList());
+ long elapsedTime = System.nanoTime() - start;
+ statistic.dispatchNodesTime.addAndGet(elapsedTime);
+ logger.debug(
+ "Dispatched one node after {}ms, min/maxDispatching time: {}/{}ns, avg: {}ns, sum: {}ns, compressing: {}ns",
- elapsedTime / 1_000_000L, minDispatchTime.get(),
- maxDispatchTime.get(), sumDispatchTime.get() / subNodes.size(), sumDispatchTime.get(),
++ elapsedTime / 1_000_000L,
++ minDispatchTime.get(),
++ maxDispatchTime.get(),
++ sumDispatchTime.get() / subNodes.size(),
++ sumDispatchTime.get(),
+ sumCompressingTime.get());
+ return !subNodeResults.contains(false);
+ }
+
+ public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode, TRegionReplicaSet replicaSet) {
+ long allStart = System.nanoTime();
+ allReplicaSets.add(replicaSet);
+ if (false) {
+ long start = System.nanoTime();
+ List<LoadTsFilePieceNode> split = pieceNodeSplitter.split(pieceNode);
+ long elapsedTime = System.nanoTime() - start;
+ statistic.splitTime.addAndGet(elapsedTime);
+ statistic.pieceNodeNum.incrementAndGet();
- logger.debug(
- "{} splits are generated after {}ms", split.size(),
- elapsedTime / 1_000_000L);
++ logger.debug("{} splits are generated after {}ms", split.size(), elapsedTime / 1_000_000L);
+ boolean success = dispatchPieceNodes(split, replicaSet);
+ statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
+ return success;
+ }
+
+ List<LoadTsFilePieceNode> subNodes;
+ if (splitFutures.size() < MAX_PENDING_PIECE_NODE) {
+ splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode), replicaSet));
+ statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
+ return true;
+ } else {
+ long start = System.nanoTime();
+ Pair<Future<List<LoadTsFilePieceNode>>, TRegionReplicaSet> pair = splitFutures.poll();
+ try {
+ subNodes = pair.left.get();
+ long elapsedTime = System.nanoTime() - start;
+ statistic.splitTime.addAndGet(elapsedTime);
+ statistic.pieceNodeNum.incrementAndGet();
+ logger.debug(
- "{} splits are generated after {}ms", subNodes.size(),
- elapsedTime / 1_000_000L);
++ "{} splits are generated after {}ms", subNodes.size(), elapsedTime / 1_000_000L);
+
+ splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode), replicaSet));
+ } catch (InterruptedException | ExecutionException e) {
+ logger.error("Unexpected error during splitting node", e);
+ return false;
+ }
+ boolean success = dispatchPieceNodes(subNodes, pair.right);
+ statistic.dispatchNodeTime.addAndGet(System.nanoTime() - allStart);
+ return success;
+ }
+ }
+
+ public static class Statistic {
+
+ public long taskStartTime;
+ public long taskEndTime;
+ public AtomicLong rawSize = new AtomicLong();
+ public AtomicLong compressedSize = new AtomicLong();
+ public AtomicLong splitTime = new AtomicLong();
+ public AtomicLong pieceNodeNum = new AtomicLong();
+ public AtomicLong dispatchNodesTime = new AtomicLong();
+ public AtomicLong dispatchNodeTime = new AtomicLong();
+ public AtomicLong compressingTime = new AtomicLong();
+
+ public void logStatistic() {
+ logger.info("Time consumption: {}ms", taskEndTime - taskStartTime);
+ logger.info(
+ "Generated {} piece nodes, splitTime: {}, dispatchSplitsTime: {}, dispatchNodeTime: {}",
+ pieceNodeNum.get(),
- splitTime.get() / 1_000_000L, dispatchNodesTime.get() / 1_000_000L,
++ splitTime.get() / 1_000_000L,
++ dispatchNodesTime.get() / 1_000_000L,
+ dispatchNodeTime.get() / 1_000_000L);
+ logger.info(
+ "Transmission size: {}/{} ({}), compressionTime: {}ms",
+ compressedSize.get(),
+ rawSize.get(),
+ compressedSize.get() * 1.0 / rawSize.get(),
+ compressingTime.get() / 1_000_000L);
+ }
+ }
+
+ public Statistic getStatistic() {
+ return statistic;
+ }
+}
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
index 5542e04cf65,55c5bc0aa3a..2cd6190bcf1
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
@@@ -205,7 -198,8 +205,7 @@@ public class TsFileSplitter
consumeChunkData(measurementId, chunkOffset, chunkData);
}
- timePartitionSlot = TimePartitionUtils.getTimePartition(times[i]);
+ timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[i]);
- satisfiedLength = 0;
endTime =
timePartitionSlot.getStartTime()
+ TimePartitionUtils.getTimePartitionInterval();
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
index 3994941c8b3,00000000000..9a4fdf3b96f
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
@@@ -1,591 -1,0 +1,616 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.nodesplit;
+
++import org.apache.iotdb.db.queryengine.execution.load.AlignedChunkData;
++import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
++import org.apache.iotdb.db.queryengine.execution.load.NonAlignedChunkData;
++import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
++import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
++import org.apache.iotdb.tsfile.file.header.ChunkHeader;
++import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
++import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
++import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
++import org.apache.iotdb.tsfile.read.common.Chunk;
++
++import net.ricecode.similarity.LevenshteinDistanceStrategy;
++import net.ricecode.similarity.SimilarityStrategy;
++import net.ricecode.similarity.StringSimilarityService;
++import net.ricecode.similarity.StringSimilarityServiceImpl;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
- import net.ricecode.similarity.LevenshteinDistanceStrategy;
- import net.ricecode.similarity.SimilarityStrategy;
- import net.ricecode.similarity.StringSimilarityService;
- import net.ricecode.similarity.StringSimilarityServiceImpl;
- import org.apache.iotdb.db.queryengine.execution.load.AlignedChunkData;
- import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
- import org.apache.iotdb.db.queryengine.execution.load.NonAlignedChunkData;
- import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
- import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
- import org.apache.iotdb.tsfile.file.header.ChunkHeader;
- import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
- import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
- import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
- import org.apache.iotdb.tsfile.read.common.Chunk;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
+
+public class ClusteringMeasurementSplitter implements PieceNodeSplitter {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClusteringMeasurementSplitter.class);
+ private double groupFactor;
+ private int maxIteration;
+ private int dataSampleLength = 32;
+ private double stdErrThreshold = 1;
+ private long splitStartTime;
+ private long splitTimeBudget = 5;
+ private static final int MAX_CLUSTER_NUM = 500;
+
+ public ClusteringMeasurementSplitter(double groupFactor, int maxIteration) {
+ this.groupFactor = groupFactor;
+ this.maxIteration = maxIteration;
+ }
+
+ @Override
+ public List<LoadTsFilePieceNode> split(LoadTsFilePieceNode pieceNode) {
+ splitStartTime = System.currentTimeMillis();
+ if (pieceNode.isHasModification()) {
+ // the order of modifications should be preserved, so with modifications clustering cannot be
+ // used
+ return new OrderedMeasurementSplitter().split(pieceNode);
+ }
+
+ // split by measurement first
- Map<String, LoadTsFilePieceNode> measurementPieceNodeMap = new HashMap<>(
- pieceNode.getAllTsFileData().size());
++ Map<String, LoadTsFilePieceNode> measurementPieceNodeMap =
++ new HashMap<>(pieceNode.getAllTsFileData().size());
+ for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
+ ChunkData chunkData = (ChunkData) tsFileData;
+ String currMeasurement = chunkData.firstMeasurement();
+ LoadTsFilePieceNode pieceNodeSplit =
+ measurementPieceNodeMap.computeIfAbsent(
+ currMeasurement,
+ m -> new LoadTsFilePieceNode(pieceNode.getPlanNodeId(), pieceNode.getTsFile()));
+ pieceNodeSplit.addTsFileData(chunkData);
+ }
+ // use clustering to merge similar measurements
+ List<LoadTsFilePieceNode> loadTsFilePieceNodes = clusterPieceNode(measurementPieceNodeMap);
+ if (logger.isDebugEnabled()) {
- logger.debug("Split distribution before refinement: {}",
- loadTsFilePieceNodes.stream().map(l -> l.getAllTsFileData().size())
++ logger.debug(
++ "Split distribution before refinement: {}",
++ loadTsFilePieceNodes.stream()
++ .map(l -> l.getAllTsFileData().size())
+ .collect(Collectors.toList()));
+ refineClusters(loadTsFilePieceNodes);
- logger.debug("Split distribution after refinement: {}",
- loadTsFilePieceNodes.stream().map(l -> l.getAllTsFileData().size())
++ logger.debug(
++ "Split distribution after refinement: {}",
++ loadTsFilePieceNodes.stream()
++ .map(l -> l.getAllTsFileData().size())
+ .collect(Collectors.toList()));
+ }
+ return loadTsFilePieceNodes;
+ }
+
+ private void refineClusters(List<LoadTsFilePieceNode> pieceNodes) {
- double average = pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average()
- .orElse(0.0);
++ double average =
++ pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average().orElse(0.0);
+ double finalAverage1 = average;
- double stderr = Math.sqrt(pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
- .mapToDouble(s -> (s - finalAverage1) * (s - finalAverage1)).sum() / pieceNodes.size());
- logger.debug("{} splits before refinement, average/stderr: {}/{}", pieceNodes.size(), average,
- stderr);
++ double stderr =
++ Math.sqrt(
++ pieceNodes.stream()
++ .mapToLong(LoadTsFilePieceNode::getDataSize)
++ .mapToDouble(s -> (s - finalAverage1) * (s - finalAverage1))
++ .sum()
++ / pieceNodes.size());
++ logger.debug(
++ "{} splits before refinement, average/stderr: {}/{}", pieceNodes.size(), average, stderr);
+
+ while (stderr > average * stdErrThreshold
+ && System.currentTimeMillis() - splitStartTime < splitTimeBudget) {
+ pieceNodes.sort(Comparator.comparingLong(LoadTsFilePieceNode::getDataSize));
+ LoadTsFilePieceNode smallestPiece = pieceNodes.get(0);
+ LoadTsFilePieceNode largestPiece = pieceNodes.get(pieceNodes.size() - 1);
+ double smallDiff = average - smallestPiece.getDataSize();
+ double largeDiff = largestPiece.getDataSize() - average;
+ if (smallDiff > largeDiff) {
+ mergeSmallPiece(pieceNodes);
+ } else {
+ if (!splitLargePiece(pieceNodes)) {
+ // the largest node may not be splittable (only one series), merge the smallest instead
+ mergeSmallPiece(pieceNodes);
+ }
+ }
+
- average = pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average()
- .orElse(0.0);
++ average =
++ pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average().orElse(0.0);
+ double finalAverage = average;
- stderr = Math.sqrt(pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
- .mapToDouble(s -> (s - finalAverage) * (s - finalAverage)).sum() / pieceNodes.size());
++ stderr =
++ Math.sqrt(
++ pieceNodes.stream()
++ .mapToLong(LoadTsFilePieceNode::getDataSize)
++ .mapToDouble(s -> (s - finalAverage) * (s - finalAverage))
++ .sum()
++ / pieceNodes.size());
+ logger.debug("{} splits, average/stderr: {}/{}", pieceNodes.size(), average, stderr);
+ }
- logger.debug("{} splits after refinement, average/stderr: {}/{}", pieceNodes.size(), average,
- stderr);
++ logger.debug(
++ "{} splits after refinement, average/stderr: {}/{}", pieceNodes.size(), average, stderr);
+ }
+
+ private void mergeSmallPiece(List<LoadTsFilePieceNode> pieceNodes) {
+ LoadTsFilePieceNode pieceA = pieceNodes.remove(0);
+ LoadTsFilePieceNode pieceB = pieceNodes.remove(0);
- LoadTsFilePieceNode newPiece = new LoadTsFilePieceNode(pieceA.getPlanNodeId(),
- pieceA.getTsFile());
++ LoadTsFilePieceNode newPiece =
++ new LoadTsFilePieceNode(pieceA.getPlanNodeId(), pieceA.getTsFile());
+ pieceA.getAllTsFileData().forEach(newPiece::addTsFileData);
+ pieceB.getAllTsFileData().forEach(newPiece::addTsFileData);
+ pieceNodes.add(newPiece);
- pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
- .average().orElse(0.0);
- pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize)
- .sum();
++ pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).average().orElse(0.0);
++ pieceNodes.stream().mapToLong(LoadTsFilePieceNode::getDataSize).sum();
+ }
+
+ private boolean splitLargePiece(List<LoadTsFilePieceNode> pieceNodes) {
+ LoadTsFilePieceNode oldPiece = pieceNodes.remove(pieceNodes.size() - 1);
- LoadTsFilePieceNode newNodeA = new LoadTsFilePieceNode(oldPiece.getPlanNodeId(),
- oldPiece.getTsFile());
- LoadTsFilePieceNode newNodeB = new LoadTsFilePieceNode(oldPiece.getPlanNodeId(),
- oldPiece.getTsFile());
++ LoadTsFilePieceNode newNodeA =
++ new LoadTsFilePieceNode(oldPiece.getPlanNodeId(), oldPiece.getTsFile());
++ LoadTsFilePieceNode newNodeB =
++ new LoadTsFilePieceNode(oldPiece.getPlanNodeId(), oldPiece.getTsFile());
+ long sizeTarget = oldPiece.getDataSize() / 2;
+
+ // cannot break a series into two pieces since the chunk order must be preserved
+ String currMeasurement = null;
+ List<TsFileData> allTsFileData = oldPiece.getAllTsFileData();
+ int i = 0;
+ for (; i < allTsFileData.size(); i++) {
+ TsFileData tsFileData = allTsFileData.get(i);
+ if (tsFileData.isModification()) {
+ // modifications follows previous chunk data
+ newNodeA.addTsFileData(tsFileData);
+ } else {
+ ChunkData chunkData = (ChunkData) tsFileData;
+ if (currMeasurement == null || currMeasurement.equals(chunkData.firstMeasurement())) {
+ // the first chunk or chunk of the same series, add it to A
+ currMeasurement = chunkData.firstMeasurement();
+ newNodeA.addTsFileData(tsFileData);
+ sizeTarget -= chunkData.getDataSize();
+ } else {
+ // chunk of the next series
+ if (sizeTarget < 0) {
+ // splitA is full, break to fill splitB
+ break;
+ } else {
+ // splitA is not full, also add this series to A
+ currMeasurement = chunkData.firstMeasurement();
+ newNodeA.addTsFileData(tsFileData);
+ sizeTarget -= chunkData.getDataSize();
+ }
+ }
+ }
+ }
+ // add remaining series to B
+ for (; i < allTsFileData.size(); i++) {
+ TsFileData tsFileData = allTsFileData.get(i);
+ newNodeB.addTsFileData(tsFileData);
+ }
+
+ pieceNodes.add(newNodeA);
+ if (!newNodeB.getAllTsFileData().isEmpty()) {
+ pieceNodes.add(newNodeB);
+ return true;
+ }
+ return false;
+ }
+
+ private List<LoadTsFilePieceNode> clusterPieceNode(
+ Map<String, LoadTsFilePieceNode> measurementPieceNodeMap) {
+ // convert to feature vector
+ Map<String, SeriesFeatureVector> measurementVectorMap =
+ new HashMap<>(measurementPieceNodeMap.size());
+ for (Entry<String, LoadTsFilePieceNode> entry : measurementPieceNodeMap.entrySet()) {
+ measurementVectorMap.put(entry.getKey(), convertToFeature(entry.getValue()));
+ }
+ // normalize
+ normalize(measurementVectorMap.values());
+ Map<String, double[]> doubleVectors = new HashMap<>(measurementPieceNodeMap.size());
+ for (Entry<String, SeriesFeatureVector> e : measurementVectorMap.entrySet()) {
+ doubleVectors.put(e.getKey(), e.getValue().numericVector);
+ }
+ // clustering
+ int numCluster = Math.min((int) (doubleVectors.size() / groupFactor), MAX_CLUSTER_NUM);
+ if (numCluster < 1) {
+ numCluster = 1;
+ }
+ VectorDistance distance = new EuclideanDistance();
+ Clustering clustering = new KMeans(numCluster, maxIteration);
+ List<List<String>> clusterResult = clustering.cluster(doubleVectors, distance);
+ // collect result
+ List<LoadTsFilePieceNode> clusteredNodes = new ArrayList<>();
+ for (List<String> cluster : clusterResult) {
+ if (cluster.isEmpty()) {
+ continue;
+ }
+ LoadTsFilePieceNode pieceNode = measurementPieceNodeMap.get(cluster.get(0));
+ LoadTsFilePieceNode clusterNode =
+ new LoadTsFilePieceNode(pieceNode.getPlanNodeId(), pieceNode.getTsFile());
+ for (String measurementId : cluster) {
+ pieceNode = measurementPieceNodeMap.get(measurementId);
+ for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
+ clusterNode.addTsFileData(tsFileData);
+ }
+ }
+ clusteredNodes.add(clusterNode);
+ }
+
+ return clusteredNodes;
+ }
+
+ private void normalize(Collection<SeriesFeatureVector> vectors) {
+ String firstMeasurementId = null;
+ int minDataSize = Integer.MAX_VALUE;
+ int maxDataSize = Integer.MIN_VALUE;
+ int minDataType = Integer.MAX_VALUE;
+ int maxDataType = Integer.MIN_VALUE;
+ int minCompressionType = Integer.MAX_VALUE;
+ int maxCompressionType = Integer.MIN_VALUE;
+ int minEncodingType = Integer.MAX_VALUE;
+ int maxEncodingType = Integer.MIN_VALUE;
+ int minNumOfPages = Integer.MAX_VALUE;
+ int maxNumOfPages = Integer.MIN_VALUE;
+ String firstDataSample = null;
+ for (SeriesFeatureVector vector : vectors) {
+ if (firstMeasurementId == null) {
+ firstMeasurementId = vector.measurementId;
+ firstDataSample = vector.dataSample;
+ }
+ minDataSize = Math.min(minDataSize, vector.dataSize);
+ maxDataSize = Math.max(maxDataSize, vector.dataSize);
+ minDataType = Math.min(minDataType, vector.dataType.ordinal());
+ maxDataType = Math.max(maxDataType, vector.dataType.ordinal());
+ minCompressionType = Math.min(minCompressionType, vector.compressionType.ordinal());
+ maxCompressionType = Math.max(maxCompressionType, vector.compressionType.ordinal());
+ minEncodingType = Math.min(minEncodingType, vector.encodingType.ordinal());
+ maxEncodingType = Math.max(maxEncodingType, vector.encodingType.ordinal());
+ minNumOfPages = Math.min(minNumOfPages, vector.numOfPages);
+ maxNumOfPages = Math.max(maxNumOfPages, vector.numOfPages);
+ }
+
+ SimilarityStrategy strategy = new LevenshteinDistanceStrategy();
+ StringSimilarityService service = new StringSimilarityServiceImpl(strategy);
+ int finalMinDataSize = minDataSize;
+ String finalFirstMeasurementId = firstMeasurementId;
+ int finalMaxDataSize = maxDataSize;
+ int finalMinDataType = minDataType;
+ int finalMaxDataType = maxDataType;
+ int finalMinCompressionType = minCompressionType;
+ int finalMaxCompressionType = maxCompressionType;
+ int finalMinEncodingType = minEncodingType;
+ int finalMaxEncodingType = maxEncodingType;
+ int finalMinNumOfPages = minNumOfPages;
+ int finalMaxNumOfPages = maxNumOfPages;
+ String finalFirstDataSample = firstDataSample;
- vectors.stream().parallel().forEach(vector -> {
- vector.numericVector[0] = service.score(finalFirstMeasurementId, vector.measurementId);
- vector.numericVector[1] = (vector.dataSize - finalMinDataSize) * 1.0 / (finalMaxDataSize - finalMinDataSize);
- vector.numericVector[2] =
- (vector.dataType.ordinal() - finalMinDataType) * 1.0 / (finalMaxDataType - finalMinDataType);
- vector.numericVector[3] =
- (vector.compressionType.ordinal() - finalMinCompressionType)
- * 1.0
- / (finalMaxCompressionType - finalMinCompressionType);
- vector.numericVector[4] =
- (vector.encodingType.ordinal() - finalMinEncodingType)
- * 1.0
- / (finalMaxEncodingType - finalMinEncodingType);
- vector.numericVector[5] =
- (vector.numOfPages - finalMinNumOfPages) * 1.0 / (finalMaxNumOfPages - finalMinNumOfPages);
- vector.numericVector[6] = service.score(finalFirstDataSample, vector.dataSample);
- double[] numericVector = vector.numericVector;
- for (int i = 0; i < numericVector.length; i++) {
- if (Double.isNaN(numericVector[i])) {
- numericVector[i] = 0.0;
- } else if (Double.isInfinite(numericVector[i])) {
- numericVector[i] = 1.0;
- }
- }
- });
++ vectors.stream()
++ .parallel()
++ .forEach(
++ vector -> {
++ vector.numericVector[0] =
++ service.score(finalFirstMeasurementId, vector.measurementId);
++ vector.numericVector[1] =
++ (vector.dataSize - finalMinDataSize)
++ * 1.0
++ / (finalMaxDataSize - finalMinDataSize);
++ vector.numericVector[2] =
++ (vector.dataType.ordinal() - finalMinDataType)
++ * 1.0
++ / (finalMaxDataType - finalMinDataType);
++ vector.numericVector[3] =
++ (vector.compressionType.ordinal() - finalMinCompressionType)
++ * 1.0
++ / (finalMaxCompressionType - finalMinCompressionType);
++ vector.numericVector[4] =
++ (vector.encodingType.ordinal() - finalMinEncodingType)
++ * 1.0
++ / (finalMaxEncodingType - finalMinEncodingType);
++ vector.numericVector[5] =
++ (vector.numOfPages - finalMinNumOfPages)
++ * 1.0
++ / (finalMaxNumOfPages - finalMinNumOfPages);
++ vector.numericVector[6] = service.score(finalFirstDataSample, vector.dataSample);
++ double[] numericVector = vector.numericVector;
++ for (int i = 0; i < numericVector.length; i++) {
++ if (Double.isNaN(numericVector[i])) {
++ numericVector[i] = 0.0;
++ } else if (Double.isInfinite(numericVector[i])) {
++ numericVector[i] = 1.0;
++ }
++ }
++ });
+ }
+
+ private SeriesFeatureVector convertToFeature(LoadTsFilePieceNode pieceNode) {
+ List<TsFileData> allTsFileData = pieceNode.getAllTsFileData();
+ SeriesFeatureVector vector = null;
+ for (TsFileData tsFileData : allTsFileData) {
+ ChunkData chunkData = (ChunkData) tsFileData;
+ if (vector == null) {
+ vector = SeriesFeatureVector.fromChunkData(chunkData, dataSampleLength);
+ } else {
+ vector.mergeChunkData(chunkData);
+ }
+ }
+ return vector;
+ }
+
+ public static class SeriesFeatureVector {
+
+ private String measurementId;
+ private int dataSize;
+ private TSDataType dataType;
+ private CompressionType compressionType;
+ private TSEncoding encodingType;
+ private int numOfPages;
+ private String dataSample;
+ private double[] numericVector = new double[7];
+
+ public static SeriesFeatureVector fromChunkData(ChunkData data, int dataSampleLength) {
+ ChunkHeader chunkHeader;
+ Chunk chunk;
+ ByteBuffer chunkBuffer;
+ // sample a buffer from the chunk data
+ if (data.isAligned()) {
+ AlignedChunkData alignedChunkData = (AlignedChunkData) data;
+ chunkHeader = alignedChunkData.getChunkHeaderList().get(0);
+ if (!alignedChunkData.getChunkList().isEmpty()) {
+ chunk = alignedChunkData.getChunkList().get(0);
+ ByteBuffer buffer = chunk.getData();
+ int sampleLength = Math.min(dataSampleLength, buffer.remaining());
+ chunkBuffer = buffer.slice();
+ chunkBuffer.limit(sampleLength);
+ } else {
+ chunkBuffer = ByteBuffer.wrap(alignedChunkData.getByteStream().getBuf());
+ int sampleLength = Math.min(dataSampleLength, chunkBuffer.remaining());
+ chunkBuffer.limit(sampleLength);
+ }
+ } else {
+ NonAlignedChunkData nonAlignedChunkData = (NonAlignedChunkData) data;
+ chunkHeader = nonAlignedChunkData.getChunkHeader();
+ chunk = nonAlignedChunkData.getChunk();
+ if (chunk != null) {
+ ByteBuffer buffer = chunk.getData();
+ int sampleLength = Math.min(dataSampleLength, buffer.remaining());
+ chunkBuffer = buffer.slice();
+ chunkBuffer.limit(sampleLength);
+ } else {
+ chunkBuffer = ByteBuffer.wrap(nonAlignedChunkData.getByteStream().getBuf());
+ int sampleLength = Math.min(dataSampleLength, chunkBuffer.remaining());
+ chunkBuffer.limit(sampleLength);
+ }
+ }
+
+ SeriesFeatureVector vector = new SeriesFeatureVector();
+ vector.measurementId = chunkHeader.getMeasurementID();
+ vector.dataSize = chunkHeader.getDataSize();
+ vector.dataType = chunkHeader.getDataType();
+ vector.compressionType = chunkHeader.getCompressionType();
+ vector.encodingType = chunkHeader.getEncodingType();
+ vector.numOfPages = chunkHeader.getNumOfPages();
+ vector.dataSample =
+ new String(
+ chunkBuffer.array(),
+ chunkBuffer.arrayOffset() + chunkBuffer.position(),
+ chunkBuffer.remaining());
+ return vector;
+ }
+
+ public void mergeChunkData(ChunkData data) {
+ ChunkHeader chunkHeader;
+ if (data.isAligned()) {
+ AlignedChunkData alignedChunkData = (AlignedChunkData) data;
+ chunkHeader = alignedChunkData.getChunkHeaderList().get(0);
+ } else {
+ NonAlignedChunkData nonAlignedChunkData = (NonAlignedChunkData) data;
+ chunkHeader = nonAlignedChunkData.getChunkHeader();
+ }
+ dataSize += chunkHeader.getDataSize();
+ numOfPages += chunkHeader.getNumOfPages();
+ }
+ }
+
+ public interface VectorDistance {
+
+ double calculate(double[] v1, double[] v2);
+ }
+
+ public static class EuclideanDistance implements VectorDistance {
+
+ @Override
+ public double calculate(double[] v1, double[] v2) {
+ double sum = 0;
+ for (int i = 0; i < v1.length; i++) {
+ sum += (v1[i] - v2[i]) * (v1[i] - v2[i]);
+ }
+ return Math.sqrt(sum) + 1;
+ }
+ }
+
+ public interface Clustering {
+
+ List<List<String>> cluster(Map<String, double[]> tagVectorMap, VectorDistance distance);
+ }
+
+ public class KMeans implements Clustering {
+
+ private int k;
+ private int maxIteration;
+ private double[][] centroids;
+ private AtomicInteger[] centroidCounters;
+ private Random random;
+ private Map<Entry<String, double[]>, Integer> recordCentroidMapping;
+ private int vecLength = 0;
+
+ public KMeans(int k, int maxIteration) {
+ this.k = k;
+ this.maxIteration = maxIteration;
+ this.centroids = new double[k][];
+ this.centroidCounters = new AtomicInteger[k];
+ for (int i = 0; i < centroidCounters.length; i++) {
+ centroidCounters[i] = new AtomicInteger();
+ }
+ this.random = new Random();
+ this.recordCentroidMapping = new ConcurrentHashMap<>();
+ }
+
+ private void clearCentroidCounter() {
+ for (AtomicInteger centroidCounter : centroidCounters) {
+ centroidCounter.set(0);
+ }
+ }
+
+ @Override
+ public List<List<String>> cluster(Map<String, double[]> tagVectorMap, VectorDistance distance) {
+ recordCentroidMapping.clear();
+ if (k > tagVectorMap.size()) {
+ k = tagVectorMap.size();
+ this.centroids = new double[k][];
+ }
+
+ for (Entry<String, double[]> entry : tagVectorMap.entrySet()) {
+ vecLength = entry.getValue().length;
+ }
+
+ randomCentroid(vecLength, tagVectorMap);
+
+ for (int i = 0; i < maxIteration; i++) {
+ if (!assignCentroid(tagVectorMap, distance)) {
+ break;
+ }
+ newCentroid();
+ clearCentroidCounter();
+ if (System.currentTimeMillis() - splitStartTime > splitTimeBudget) {
+ break;
+ }
+ }
+
+ Map<Integer, List<Entry<String, double[]>>> centroidRecordMap =
+ collectCentroidRecordMapping();
+ return centroidRecordMap.values().stream()
+ .map(l -> l.stream().map(Entry::getKey).collect(Collectors.toList()))
+ .collect(Collectors.toList());
+ }
+
+ private Map<Integer, List<Entry<String, double[]>>> collectCentroidRecordMapping() {
+ Map<Integer, List<Entry<String, double[]>>> centroidRecordMapping = new ConcurrentHashMap<>();
+ recordCentroidMapping.entrySet().stream()
+ .parallel()
+ .forEach(
+ e ->
+ centroidRecordMapping.compute(
+ e.getValue(),
+ (key, oldV) -> {
+ if (oldV == null) {
+ oldV = new ArrayList<>();
+ }
+ oldV.add(e.getKey());
+ return oldV;
+ }));
+ return centroidRecordMapping;
+ }
+
+ private void newCentroid() {
+ Map<Integer, List<Entry<String, double[]>>> centroidRecordMapping =
+ collectCentroidRecordMapping();
+ centroidRecordMapping.entrySet().stream()
+ .parallel()
+ .forEach(
+ e -> {
+ Integer centroidId = e.getKey();
+ List<Entry<String, double[]>> records = e.getValue();
+ int recordNum = records.size();
+ double[] sumVec = new double[vecLength];
+ for (Entry<String, double[]> record : records) {
+ for (int i = 0; i < sumVec.length; i++) {
+ sumVec[i] += record.getValue()[i];
+ }
+ }
+ for (int i = 0; i < sumVec.length; i++) {
+ sumVec[i] = sumVec[i] / recordNum;
+ }
+ centroids[centroidId] = sumVec;
+ });
+ }
+
+ private boolean assignCentroid(Map<String, double[]> tagVectorMap, VectorDistance distance) {
+ AtomicBoolean centroidUpdated = new AtomicBoolean(false);
+ tagVectorMap.entrySet().stream()
+ .parallel()
+ .forEach(
+ e -> {
+ double[] vector = e.getValue();
+ double minDist = Double.MAX_VALUE;
+ int nearestCentroid = 0;
+ for (int i = 0; i < centroids.length; i++) {
+ double dist =
- distance.calculate(vector, centroids[i]) * (1
- + centroidCounters[i].get() * 0.1);
++ distance.calculate(vector, centroids[i])
++ * (1 + centroidCounters[i].get() * 0.1);
+ if (dist < minDist) {
+ minDist = dist;
+ nearestCentroid = i;
+ }
+ }
+
+ centroidCounters[nearestCentroid].incrementAndGet();
+ int finalNearestCentroid = nearestCentroid;
+ recordCentroidMapping.put(e, finalNearestCentroid);
+ recordCentroidMapping.compute(
+ e,
+ (t, oc) -> {
+ if (oc == null || oc != finalNearestCentroid) {
+ centroidUpdated.set(true);
+ return finalNearestCentroid;
+ } else {
+ return oc;
+ }
+ });
+ });
+ return centroidUpdated.get();
+ }
+
+ private void randomCentroid(int vecLength, Map<String, double[]> tagVectorMap) {
+ pickRandomCentroid(tagVectorMap);
+ // genRandomCentroid(vecLength);
+ }
+
+ private void pickRandomCentroid(Map<String, double[]> tagVectorMap) {
+ List<double[]> recordVectors = new ArrayList<>(tagVectorMap.values());
+ Collections.shuffle(recordVectors);
+ for (int i = 0; i < k; i++) {
+ centroids[i] = recordVectors.get(i);
+ }
+ }
+
+ private void genRandomCentroid(int vecLength) {
+ for (int i = 0; i < k; i++) {
+ double[] centroid = new double[vecLength];
+ for (int j = 0; j < vecLength; j++) {
+ centroid[j] = random.nextDouble();
+ }
+ centroids[i] = centroid;
+ }
+ }
+ }
+}
diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 2eb3500eaa8,a1cf6f83749..48423aa5f92
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@@ -186,15 -194,9 +189,16 @@@ public class LoadTsFileScheduler implem
private boolean firstPhase(LoadSingleTsFileNode node) {
try {
- TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node);
+ TsFileDataManager tsFileDataManager =
+ new TsFileDataManager(
+ this::dispatchOnePieceNode,
+ node.getPlanNodeId(),
+ node.getTsFileResource().getTsFile(),
+ partitionFetcher,
- MAX_MEMORY_SIZE);
++ MAX_MEMORY_SIZE,
++ queryContext.getSession().getUserName());
new TsFileSplitter(
- node.getTsFileResource().getTsFile(), tsFileDataManager::addOrSendTsFileData)
+ node.getTsFileResource().getTsFile(), tsFileDataManager::addOrSendTsFileData, 0)
.splitTsFileByDataPartition();
if (!tsFileDataManager.sendAllTsFileData()) {
stateMachine.transitionToFailed(new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
diff --cc iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
index 046c52794fb,00000000000..2e2db3dd13e
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
@@@ -1,199 -1,0 +1,205 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
- import static org.junit.Assert.assertEquals;
-
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.util.Collections;
- import java.util.List;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
++import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
- import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
+import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.read.TsFileReader;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
++
+import org.apache.thrift.TException;
+import org.junit.Test;
+
++import java.io.IOException;
++import java.nio.ByteBuffer;
++import java.util.Collections;
++import java.util.List;
++
++import static org.junit.Assert.assertEquals;
++
+public class LoadTsFileManagerTest extends TestBase {
+
+ private LoadTsFileManager loadTsFileManager = new LoadTsFileManager();
+ private long maxSplitSize = 128 * 1024 * 1024;
+
+ @Override
+ public void setup() throws IOException, WriteProcessException, DataRegionException {
+ fileNum = 10;
+ seriesNum = 100;
+ deviceNum = 100;
+ super.setup();
+ }
+
+ @Test
+ public void test() throws IOException {
+
+ LoadTsFileNode loadTsFileNode =
+ new LoadTsFileNode(new PlanNodeId("testPlanNode"), tsFileResources);
+ DataPartitionBatchFetcher partitionBatchFetcher = dummyDataPartitionBatchFetcher();
+ TsFileSplitSender splitSender =
+ new TsFileSplitSender(
+ loadTsFileNode,
+ partitionBatchFetcher,
+ TimePartitionUtils.getTimePartitionInterval(),
+ internalServiceClientManager,
+ false,
+ maxSplitSize,
- 100);
++ 100,
++ "root");
+ long start = System.currentTimeMillis();
+ splitSender.start();
+ long timeConsumption = System.currentTimeMillis() - start;
+
+ System.out.printf("Split ends after %dms\n", timeConsumption);
+
+ ConsensusGroupId d1GroupId = Factory.create(TConsensusGroupType.DataRegion.getValue(), 0);
+ DataRegion dataRegion = dataRegionMap.get(d1GroupId.convertToTConsensusGroupId());
+ List<TsFileResource> tsFileList = dataRegion.getTsFileManager().getTsFileList(false);
+ // all input files should be shallow-merged into one
+ System.out.printf("Loaded TsFiles: %s\n", tsFileList);
+ assertEquals(1, tsFileList.size());
+
+ long timePartitionInterval = TimePartitionUtils.getTimePartitionInterval();
+ long chunkTimeRange = (long) (timePartitionInterval * chunkTimeRangeRatio);
+ int chunkPointNum = (int) (chunkTimeRange / pointInterval);
+ long endTime = chunkTimeRange * (fileNum - 1) + pointInterval * (chunkPointNum - 1);
+ // check device time
+ TsFileResource tsFileResource = tsFileList.get(0);
+ for (int i = 0; i < deviceNum; i++) {
+ assertEquals(0, tsFileResource.getStartTime("d" + i));
+ assertEquals(endTime, tsFileResource.getEndTime("d" + i));
+ }
+
+ // read and check the generated file
- try (TsFileReader reader = new TsFileReader(
- new TsFileSequenceReader(tsFileResource.getTsFile().getPath()))) {
++ try (TsFileReader reader =
++ new TsFileReader(new TsFileSequenceReader(tsFileResource.getTsFile().getPath()))) {
+ for (int dn = 0; dn < deviceNum; dn++) {
+ // "Simple_" is generated with linear function and is easy to check
- QueryExpression queryExpression = QueryExpression.create(
- Collections.singletonList(new Path("d" + dn, "Simple_22", false)), null);
++ QueryExpression queryExpression =
++ QueryExpression.create(
++ Collections.singletonList(new Path("d" + dn, "Simple_22", false)), null);
+ QueryDataSet dataSet = reader.query(queryExpression);
+ int i = 0;
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ long currTime =
+ chunkTimeRange * (i / chunkPointNum) + pointInterval * (i % chunkPointNum);
+ assertEquals(currTime, record.getTimestamp());
+ assertEquals(1.0 * (i % chunkPointNum), record.getFields().get(0).getDoubleV(), 0.0001);
+ i++;
+ }
+ assertEquals(chunkPointNum * fileNum, i);
+ }
+ }
+ }
+
+ public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint tEndpoint)
+ throws TException, IOException {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+ ByteBuffer buf = req.body.slice();
+ if (req.isSetCompressionType()) {
+ CompressionType compressionType = CompressionType.deserialize(req.compressionType);
+ IUnCompressor unCompressor = IUnCompressor.getUnCompressor(compressionType);
+ int uncompressedLength = req.getUncompressedLength();
+ ByteBuffer allocate = ByteBuffer.allocate(uncompressedLength);
- unCompressor.uncompress(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining(),
- allocate.array(), 0);
++ unCompressor.uncompress(
++ buf.array(), buf.arrayOffset() + buf.position(), buf.remaining(), allocate.array(), 0);
+ allocate.limit(uncompressedLength);
+ buf = allocate;
+ }
+
+ LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) PlanNodeType.deserialize(buf);
- loadTsFileManager.writeToDataRegion(dataRegionMap.get(req.consensusGroupId), pieceNode,
- req.uuid);
++ loadTsFileManager.writeToDataRegion(
++ dataRegionMap.get(req.consensusGroupId), pieceNode, req.uuid);
+
+ // forward to other replicas in the group
+ if (req.isRelay) {
+ req.isRelay = false;
+ TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
- regionReplicaSet.getDataNodeLocations().stream().parallel().forEach(dataNodeLocation -> {
- TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
- if (!otherPoint.equals(tEndpoint)) {
- try {
- handleTsFilePieceNode(req, otherPoint);
- } catch (TException | IOException e) {
- throw new RuntimeException(e);
- }
- }
- });
++ regionReplicaSet.getDataNodeLocations().stream()
++ .parallel()
++ .forEach(
++ dataNodeLocation -> {
++ TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
++ if (!otherPoint.equals(tEndpoint)) {
++ try {
++ handleTsFilePieceNode(req, otherPoint);
++ } catch (TException | IOException e) {
++ throw new RuntimeException(e);
++ }
++ }
++ });
+ }
+
+ return new TLoadResp()
+ .setAccepted(true)
+ .setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
+ public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint tEndpoint)
+ throws LoadFileException, IOException {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+
+ // forward to other replicas in the group
+ if (req.useConsensus) {
+ req.useConsensus = false;
+ TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+ for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+ TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
+ if (!otherPoint.equals(tEndpoint)) {
+ handleTsLoadCommand(req, otherPoint);
+ }
+ }
+ }
+
+ loadTsFileManager.loadAll(req.uuid, false);
+
+ return new TLoadResp()
+ .setAccepted(true)
+ .setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
-
+}
diff --cc iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
index 71efb778a47,00000000000..c675a9f02f8
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
@@@ -1,77 -1,0 +1,77 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
- import org.apache.iotdb.db.utils.TimePartitionUtils;
++import org.apache.iotdb.commons.utils.TimePartitionUtils;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class MergedTsFileSplitterTest extends TestBase {
+
+ private List<TsFileData> resultSet = new ArrayList<>();
+
+ @Test
+ public void testSplit() throws IOException {
+ long start = System.currentTimeMillis();
+ MergedTsFileSplitter splitter =
+ new MergedTsFileSplitter(
+ files,
+ this::consumeSplit,
+ IoTDBThreadPoolFactory.newThreadPool(
+ 16,
+ Integer.MAX_VALUE,
+ 20,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new IoTThreadFactory("MergedTsFileSplitter"),
+ "MergedTsFileSplitter"),
+ TimePartitionUtils.getTimePartitionInterval(),
+ fileNum);
+ try {
+ splitter.splitTsFileByDataPartition();
+ for (TsFileData tsFileData : resultSet) {
+ // System.out.println(tsFileData);
+ }
+ } finally {
+ splitter.close();
+ }
+ System.out.printf(
+ "%d splits after %dms\n", resultSet.size(), System.currentTimeMillis() - start);
+ assertEquals(resultSet.size(), expectedChunkNum());
+ }
+
+ public boolean consumeSplit(TsFileData data) {
+ resultSet.add(data);
+ if (resultSet.size() % 1000 == 0) {
+ System.out.printf("%d chunks split\n", resultSet.size());
+ }
+ return true;
+ }
+}
diff --cc iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
index a8e7ee3b2bf,00000000000..de0bf3c9590
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
@@@ -1,438 -1,0 +1,431 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
- import static org.apache.iotdb.commons.conf.IoTDBConstant.GB;
-
- import java.util.Comparator;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
++import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
+import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.utils.FileUtils;
++import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
- import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
+import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy.DirectFlushPolicy;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
+import org.apache.iotdb.db.utils.SequenceUtils.DoubleSequenceGenerator;
+import org.apache.iotdb.db.utils.SequenceUtils.DoubleSequenceGeneratorFactory;
+import org.apache.iotdb.db.utils.SequenceUtils.GaussianDoubleSequenceGenerator;
+import org.apache.iotdb.db.utils.SequenceUtils.SimpleDoubleSequenceGenerator;
+import org.apache.iotdb.db.utils.SequenceUtils.UniformDoubleSequenceGenerator;
- import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
- import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TByteBuffer;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
++import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
++import static org.apache.iotdb.commons.conf.IoTDBConstant.GB;
++
+public class TestBase {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestBase.class);
+ public static final String BASE_OUTPUT_PATH = "target".concat(File.separator).concat("loadTest");
+ public static final String PARTIAL_PATH_STRING =
+ "%s" + File.separator + "%d" + File.separator + "%d" + File.separator;
+ public static final String TEST_TSFILE_PATH =
+ BASE_OUTPUT_PATH + File.separator + "testTsFile".concat(File.separator) + PARTIAL_PATH_STRING;
+
+ protected int fileNum = 100;
+ // series number of each file, sn non-aligned series and 1 aligned series with sn measurements
+ protected int seriesNum = 1000;
+ protected int deviceNum = 100;
+ // number of chunks of each series in a file, each series has only one chunk in a file
+ protected double chunkTimeRangeRatio = 0.001;
+ // the interval between two consecutive points of a series
+ protected long pointInterval = 50_000;
- protected List<Pair<String, DoubleSequenceGeneratorFactory>> measurementSequenceGeneratorPairs = Arrays.asList(
- new Pair<>("Simple_", new SimpleDoubleSequenceGenerator.Factory()),
- new Pair<>("UniformA_", new UniformDoubleSequenceGenerator.Factory(1.0)),
- new Pair<>("GaussianA_", new GaussianDoubleSequenceGenerator.Factory(1.0, 1.0)),
- new Pair<>("UniformB_", new UniformDoubleSequenceGenerator.Factory(10.0)),
- new Pair<>("GaussianB_", new GaussianDoubleSequenceGenerator.Factory(15.0, 3.0))
- );
++ protected List<Pair<String, DoubleSequenceGeneratorFactory>> measurementSequenceGeneratorPairs =
++ Arrays.asList(
++ new Pair<>("Simple_", new SimpleDoubleSequenceGenerator.Factory()),
++ new Pair<>("UniformA_", new UniformDoubleSequenceGenerator.Factory(1.0)),
++ new Pair<>("GaussianA_", new GaussianDoubleSequenceGenerator.Factory(1.0, 1.0)),
++ new Pair<>("UniformB_", new UniformDoubleSequenceGenerator.Factory(10.0)),
++ new Pair<>("GaussianB_", new GaussianDoubleSequenceGenerator.Factory(15.0, 3.0)));
+ protected final List<File> files = new ArrayList<>();
+ protected final List<TsFileResource> tsFileResources = new ArrayList<>();
+ protected IPartitionFetcher partitionFetcher;
+ // the key is deviceId, not partitioned by time in the simple test
+ protected Map<String, TRegionReplicaSet> partitionTable = new HashMap<>();
+ protected Map<TConsensusGroupId, DataRegion> dataRegionMap = new HashMap<>();
+ protected Map<ConsensusGroupId, TRegionReplicaSet> groupId2ReplicaSetMap = new HashMap<>();
+ protected IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ internalServiceClientManager;
+ // the third key is UUid, the forth key is targetFile
+
+ private int groupSizeInByte;
+
+ @Before
+ public void setup() throws IOException, WriteProcessException, DataRegionException {
+ setupFiles();
+ logger.info("{} files set up", files.size());
+ partitionFetcher = dummyPartitionFetcher();
+ setupPartitionTable();
+ setupClientManager();
+ groupSizeInByte = TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte();
+ TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte((int) (GB));
+ }
+
+ @After
+ public void cleanup() {
+ FileUtils.deleteDirectory(new File(BASE_OUTPUT_PATH));
+ FileUtils.deleteDirectory(new File("target" + File.separator + "data"));
+ TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(groupSizeInByte);
+ }
+
+ public int expectedChunkNum() {
+ double totalTimeRange = chunkTimeRangeRatio * fileNum;
+ int splitChunkNum = 0;
+ // if the boundary of the ith partition does not overlap a chunk, it introduces an additional
+ // split
+ // TODO: due to machine precision, the calculation may have error. Also, if the data amount is
+ // too large, there could be more than one chunk for each series in the file.
+ for (int i = 0; i <= totalTimeRange; i++) {
+ if (i * 1.0 % chunkTimeRangeRatio > 0.00001) {
+ splitChunkNum += 1;
+ }
+ }
+ return (splitChunkNum + fileNum) * seriesNum * 2;
+ }
+
+ public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint tEndpoint)
+ throws TException, IOException {
+ return new TLoadResp()
+ .setAccepted(true)
+ .setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
+ public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint tEndpoint)
+ throws LoadFileException, IOException {
+ return new TLoadResp()
+ .setAccepted(true)
+ .setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
+ public TProtocol dummyProtocol() throws TTransportException {
+ return new TBinaryProtocol(new TByteBuffer(ByteBuffer.allocate(0)));
+ }
+
+ public void setupClientManager() {
+ SyncDataNodeInternalServiceClientPoolFactory poolFactory =
+ new SyncDataNodeInternalServiceClientPoolFactory();
+ internalServiceClientManager =
+ new ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>(poolFactory) {
+ @Override
+ public SyncDataNodeInternalServiceClient borrowClient(TEndPoint node) {
+ try {
+ return new SyncDataNodeInternalServiceClient(
+ dummyProtocol(), new ThriftClientProperty.Builder().build(), node, this) {
+ @Override
+ public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req) throws TException {
+ try {
+ return handleTsFilePieceNode(req, getTEndpoint());
+ } catch (IOException e) {
+ throw new TException(e);
+ }
+ }
+
+ @Override
+ public TLoadResp sendLoadCommand(TLoadCommandReq req) throws TException {
+ try {
+ return handleTsLoadCommand(req, getTEndpoint());
+ } catch (LoadFileException | IOException e) {
+ throw new TException(e);
+ }
+ }
+
+ @Override
- public void close() {
- }
++ public void close() {}
+ };
+ } catch (TTransportException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
- public void clear(TEndPoint node) {
- }
++ public void clear(TEndPoint node) {}
+
+ @Override
- public void close() {
- }
++ public void close() {}
+ };
+ }
+
+ public void setupPartitionTable() throws DataRegionException {
+ ConsensusGroupId d1GroupId = Factory.create(TConsensusGroupType.DataRegion.getValue(), 0);
+ TRegionReplicaSet d1Replicas =
+ new TRegionReplicaSet(
+ d1GroupId.convertToTConsensusGroupId(),
+ Arrays.asList(
+ new TDataNodeLocation()
+ .setDataNodeId(0)
+ .setInternalEndPoint(new TEndPoint("localhost", 10000)),
+ new TDataNodeLocation()
+ .setDataNodeId(1)
+ .setInternalEndPoint(new TEndPoint("localhost", 10001)),
+ new TDataNodeLocation()
+ .setDataNodeId(2)
+ .setInternalEndPoint(new TEndPoint("localhost", 10002))));
+
- WALRecoverManager.getInstance()
- .setAllDataRegionScannedLatch(new CountDownLatch(0));
- DataRegion dataRegion = new DataRegion(BASE_OUTPUT_PATH, d1GroupId.toString(),
- new DirectFlushPolicy(), "root.loadTest");
++ WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new ExceptionalCountDownLatch(0));
++ DataRegion dataRegion =
++ new DataRegion(
++ BASE_OUTPUT_PATH, d1GroupId.toString(), new DirectFlushPolicy(), "root.loadTest");
+ for (int i = 0; i < deviceNum; i++) {
+ partitionTable.put("d" + i, d1Replicas);
+ dataRegionMap.put(d1GroupId.convertToTConsensusGroupId(), dataRegion);
+ }
+
+ groupId2ReplicaSetMap.put(d1GroupId, d1Replicas);
+ ConsensusGroupId d2GroupId = Factory.create(TConsensusGroupType.DataRegion.getValue(), 1);
+ TRegionReplicaSet d2Replicas =
+ new TRegionReplicaSet(
+ d2GroupId.convertToTConsensusGroupId(),
+ Arrays.asList(
+ new TDataNodeLocation()
+ .setDataNodeId(3)
+ .setInternalEndPoint(new TEndPoint("localhost", 10003)),
+ new TDataNodeLocation()
+ .setDataNodeId(4)
+ .setInternalEndPoint(new TEndPoint("localhost", 10004)),
+ new TDataNodeLocation()
+ .setDataNodeId(5)
+ .setInternalEndPoint(new TEndPoint("localhost", 10005))));
+ partitionTable.put("dd1", d2Replicas);
+ groupId2ReplicaSetMap.put(d2GroupId, d2Replicas);
+ }
+
+ public IPartitionFetcher dummyPartitionFetcher() {
+ return new IPartitionFetcher() {
+ @Override
+ public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
+ return null;
+ }
+
+ @Override
- public SchemaPartition getOrCreateSchemaPartition(PathPatternTree patternTree) {
++ public SchemaPartition getOrCreateSchemaPartition(
++ PathPatternTree patternTree, String userName) {
+ return null;
+ }
+
+ @Override
+ public DataPartition getDataPartition(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return null;
+ }
+
+ @Override
+ public DataPartition getDataPartitionWithUnclosedTimeRange(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return null;
+ }
+
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return null;
+ }
+
+ @Override
+ public DataPartition getOrCreateDataPartition(
- List<DataPartitionQueryParam> dataPartitionQueryParams) {
++ List<DataPartitionQueryParam> dataPartitionQueryParams, String userName) {
+ return null;
+ }
+
+ @Override
+ public SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
- PathPatternTree patternTree, Integer level) {
++ PathPatternTree patternTree, PathPatternTree scope, Integer level) {
+ return null;
+ }
+
+ @Override
+ public boolean updateRegionCache(TRegionRouteReq req) {
+ return false;
+ }
+
+ @Override
- public void invalidAllCache() {
- }
++ public void invalidAllCache() {}
+ };
+ }
+
+ public DataPartitionBatchFetcher dummyDataPartitionBatchFetcher() {
+ return new DataPartitionBatchFetcher(partitionFetcher) {
+ @Override
+ public List<TRegionReplicaSet> queryDataPartition(
- List<Pair<String, TTimePartitionSlot>> slotList) {
++ List<Pair<String, TTimePartitionSlot>> slotList, String userName) {
+ return slotList.stream().map(p -> partitionTable.get(p.left)).collect(Collectors.toList());
+ }
+ };
+ }
+
+ public void setupFiles() {
+ measurementSequenceGeneratorPairs.sort(Comparator.comparing(Pair::getLeft));
- List<Pair<MeasurementSchema, DoubleSequenceGeneratorFactory>> schemaGeneratorPairs = new ArrayList<>();
++ List<Pair<MeasurementSchema, DoubleSequenceGeneratorFactory>> schemaGeneratorPairs =
++ new ArrayList<>();
+ for (int sn = 0; sn < seriesNum; sn++) {
- Pair<String, DoubleSequenceGeneratorFactory> measurementGeneratorPair = measurementSequenceGeneratorPairs.get(
- sn % measurementSequenceGeneratorPairs.size());
++ Pair<String, DoubleSequenceGeneratorFactory> measurementGeneratorPair =
++ measurementSequenceGeneratorPairs.get(sn % measurementSequenceGeneratorPairs.size());
+ MeasurementSchema measurementSchema =
- new MeasurementSchema(measurementGeneratorPair
- .left + sn,
- TSDataType.DOUBLE);
++ new MeasurementSchema(measurementGeneratorPair.left + sn, TSDataType.DOUBLE);
+ measurementSchema.setCompressor(CompressionType.LZ4.serialize());
+ measurementSchema.setEncoding(TSEncoding.PLAIN.serialize());
+ schemaGeneratorPairs.add(new Pair<>(measurementSchema, measurementGeneratorPair.right));
+ }
+ schemaGeneratorPairs.sort(Comparator.comparing(s -> s.left.getMeasurementId()));
- List<MeasurementSchema> measurementSchemas = schemaGeneratorPairs.stream().map(m -> m.left)
- .collect(
- Collectors.toList());
++ List<MeasurementSchema> measurementSchemas =
++ schemaGeneratorPairs.stream().map(m -> m.left).collect(Collectors.toList());
+ IntStream.range(0, fileNum)
+ .parallel()
+ .forEach(
+ i -> {
+ try {
+ File file = new File(getTestTsFilePath("root.sg1", 0, 0, i));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ synchronized (files) {
+ files.add(file);
+ }
+
+ try (TsFileWriter writer = new TsFileWriter(file)) {
+ // sn non-aligned series under d1 and 1 aligned series with sn measurements under
+ // dd2
+ for (int sn = 0; sn < seriesNum; sn++) {
+ for (int dn = 0; dn < deviceNum; dn++) {
- writer.registerTimeseries(new Path("d" + dn),
- schemaGeneratorPairs.get(sn).left);
++ writer.registerTimeseries(
++ new Path("d" + dn), schemaGeneratorPairs.get(sn).left);
+ }
+ }
+ writer.registerAlignedTimeseries(new Path("dd1"), measurementSchemas);
+
+ // one chunk for each series
+ long timePartitionInterval = TimePartitionUtils.getTimePartitionInterval();
+ long chunkTimeRange = (long) (timePartitionInterval * chunkTimeRangeRatio);
+ int chunkPointNum = (int) (chunkTimeRange / pointInterval);
+
+ Tablet tablet = new Tablet("d0", measurementSchemas, chunkPointNum);
+ for (int sn = 0; sn < seriesNum; sn++) {
+ DoubleSequenceGenerator sequenceGenerator =
- schemaGeneratorPairs.get(sn).right
- .create();
++ schemaGeneratorPairs.get(sn).right.create();
+ for (int pn = 0; pn < chunkPointNum; pn++) {
+ if (sn == 0) {
+ long currTime = chunkTimeRange * i + pointInterval * pn;
+ tablet.addTimestamp(pn, currTime);
+ }
- tablet.addValue(schemaGeneratorPairs.get(sn).left.getMeasurementId(), pn,
++ tablet.addValue(
++ schemaGeneratorPairs.get(sn).left.getMeasurementId(),
++ pn,
+ sequenceGenerator.gen(pn));
+ }
+ }
+
+ tablet.rowSize = chunkPointNum;
+ for (int dn = 0; dn < deviceNum; dn++) {
+ tablet.deviceId = "d" + dn;
+ writer.write(tablet);
+ }
+ tablet.deviceId = "d2";
+ // writer.writeAligned(tablet);
+
+ writer.flushAllChunkGroups();
+ for (int dn = 0; dn < deviceNum; dn++) {
+ tsFileResource.updateStartTime("d" + dn, chunkTimeRange * i);
+ tsFileResource.updateEndTime("d" + dn, chunkTimeRange * (i + 1));
+ }
+ }
+
+ tsFileResource.close();
+ synchronized (tsFileResources) {
+ tsFileResources.add(tsFileResource);
+ }
+ } catch (IOException | WriteProcessException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ tsFileResources.sort(Comparator.comparingLong(TsFileResource::getVersion));
+ }
+
+ public static String getTestTsFilePath(
+ String logicalStorageGroupName,
+ long VirtualStorageGroupId,
+ long TimePartitionId,
+ long tsFileVersion) {
+ String filePath =
+ String.format(
+ TEST_TSFILE_PATH, logicalStorageGroupName, VirtualStorageGroupId, TimePartitionId);
+ return TsFileGeneratorUtils.getTsFilePath(filePath, tsFileVersion);
+ }
+}
diff --cc iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
index a1509216852,00000000000..3f605a36247
mode 100644,000000..100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@@ -1,329 -1,0 +1,337 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
- import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
++import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
- import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
+import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.apache.thrift.TException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
++import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
+import static org.junit.Assert.assertEquals;
+
+public class TsFileSplitSenderTest extends TestBase {
+
+ private static final Logger logger = LoggerFactory.getLogger(TsFileSplitSenderTest.class);
+ protected Map<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>>>
+ phaseOneResults = new ConcurrentSkipListMap<>();
+ // the third key is UUid, the value is command type
+ protected Map<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>> phaseTwoResults =
+ new ConcurrentSkipListMap<>();
+ // simulating network delay and packet loss
- private long dummyDelayMS = 000;
++ private long dummyDelayMS = 0;
+ private double packetLossRatio = 0.00;
+ private Random random = new Random();
+ private long maxSplitSize = 128 * 1024 * 1024;
+ // simulating jvm stall like GC
+ private long minStuckIntervalMS = 50000;
+ private long maxStuckIntervalMS = 100000;
+ private long stuckDurationMS = 0;
+
+ private long nodeThroughput = 10_000;
+
+ protected Map<TEndPoint, Pair<Long, Long>> nextStuckTimeMap = new ConcurrentHashMap<>();
+ private AtomicLong sumHandleTime = new AtomicLong();
+ private AtomicLong decompressTime = new AtomicLong();
+ private AtomicLong deserializeTime = new AtomicLong();
+ private AtomicLong relayTime = new AtomicLong();
+ private AtomicLong maxMemoryUsage = new AtomicLong();
+
+ @Test
+ public void test() throws IOException {
- Thread thread = new Thread(() -> {
- while (!Thread.interrupted()) {
- long preUsage = maxMemoryUsage.get();
- long newUsage = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
- if (preUsage < newUsage) {
- maxMemoryUsage.set(newUsage);
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- return;
- }
- }
- });
++ Thread thread =
++ new Thread(
++ () -> {
++ while (!Thread.interrupted()) {
++ long preUsage = maxMemoryUsage.get();
++ long newUsage =
++ Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
++ if (preUsage < newUsage) {
++ maxMemoryUsage.set(newUsage);
++ }
++ try {
++ Thread.sleep(100);
++ } catch (InterruptedException e) {
++ return;
++ }
++ }
++ });
+ thread.start();
+
+ LoadTsFileNode loadTsFileNode =
+ new LoadTsFileNode(new PlanNodeId("testPlanNode"), tsFileResources);
+ DataPartitionBatchFetcher partitionBatchFetcher = dummyDataPartitionBatchFetcher();
+ TsFileSplitSender splitSender =
+ new TsFileSplitSender(
+ loadTsFileNode,
+ partitionBatchFetcher,
+ TimePartitionUtils.getTimePartitionInterval(),
+ internalServiceClientManager,
+ false,
+ maxSplitSize,
- 100);
++ 100,
++ "root");
+ long start = System.currentTimeMillis();
+ splitSender.start();
+ long timeConsumption = System.currentTimeMillis() - start;
+ thread.interrupt();
+
+ printPhaseResult();
+ long transmissionTime = splitSender.getStatistic().compressedSize.get() / nodeThroughput;
- System.out.printf("Split ends after %dms + %dms (Transmission) = %dms\n", timeConsumption,
- transmissionTime, timeConsumption + transmissionTime);
++ System.out.printf(
++ "Split ends after %dms + %dms (Transmission) = %dms\n",
++ timeConsumption, transmissionTime, timeConsumption + transmissionTime);
+ System.out.printf("Handle sum %dns\n", sumHandleTime.get());
+ System.out.printf("Decompress sum %dns\n", decompressTime.get());
+ System.out.printf("Deserialize sum %dns\n", deserializeTime.get());
+ System.out.printf("Relay sum %dns\n", relayTime.get());
+ System.out.printf("Memory usage %dMB\n", maxMemoryUsage.get() / MB);
+ }
+
+ public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint tEndpoint)
+ throws TException, IOException {
+ long handleStart = System.nanoTime();
+ if ((tEndpoint.getPort() - 10000) % 3 == 0
+ && random.nextDouble() < packetLossRatio
+ && req.isRelay) {
+ throw new TException("Packet lost");
+ }
+ if ((tEndpoint.getPort() - 10000) % 3 == 1
+ && random.nextDouble() < packetLossRatio / 2
+ && req.isRelay) {
+ throw new TException("Packet lost");
+ }
+
+ if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay && stuckDurationMS > 0) {
+ Pair<Long, Long> nextStuckTime =
+ nextStuckTimeMap.computeIfAbsent(
+ tEndpoint,
+ e ->
+ new Pair<>(
+ System.currentTimeMillis(), System.currentTimeMillis() + stuckDurationMS));
+ long currTime = System.currentTimeMillis();
+ if (currTime >= nextStuckTime.left && currTime < nextStuckTime.right) {
+ logger.debug("Node{} stalls", tEndpoint.getPort() - 10000);
+ try {
+ Thread.sleep(nextStuckTime.right - currTime);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } else if (currTime > nextStuckTime.right) {
+ nextStuckTimeMap.compute(
+ tEndpoint,
+ (endPoint, newInterval) -> {
+ if (newInterval != null && currTime < newInterval.right) {
+ return newInterval;
+ }
+ long start =
+ currTime
+ + minStuckIntervalMS
+ + random.nextInt((int) (maxStuckIntervalMS - minStuckIntervalMS));
+ return new Pair<>(start, start + stuckDurationMS);
+ });
+ }
+ }
+
+ long decompressStart = System.nanoTime();
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+ ByteBuffer buf = req.body.slice();
+ if (req.isSetCompressionType()) {
+ CompressionType compressionType = CompressionType.deserialize(req.compressionType);
+ IUnCompressor unCompressor = IUnCompressor.getUnCompressor(compressionType);
+ int uncompressedLength = req.getUncompressedLength();
+ ByteBuffer allocate = ByteBuffer.allocate(uncompressedLength);
- unCompressor.uncompress(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining(),
- allocate.array(), 0);
++ unCompressor.uncompress(
++ buf.array(), buf.arrayOffset() + buf.position(), buf.remaining(), allocate.array(), 0);
+ allocate.limit(uncompressedLength);
+ buf = allocate;
+ }
+ decompressTime.addAndGet(System.nanoTime() - decompressStart);
+
+ long deserializeStart = System.nanoTime();
+ LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) PlanNodeType.deserialize(buf);
+ deserializeTime.addAndGet(System.nanoTime() - deserializeStart);
+ Set<Integer> splitIds =
+ phaseOneResults
+ .computeIfAbsent(
+ tEndpoint,
+ e -> new ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
+ .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
+ .computeIfAbsent(req.uuid, id -> new ConcurrentSkipListMap<>())
+ .computeIfAbsent(pieceNode.getTsFile(), f -> new ConcurrentSkipListSet<>());
+ splitIds.addAll(
+ pieceNode.getAllTsFileData().stream()
+ .map(TsFileData::getSplitId)
+ .collect(Collectors.toList()));
+
+ if (dummyDelayMS > 0) {
+ if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay) {
+ try {
+ Thread.sleep(dummyDelayMS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if ((tEndpoint.getPort() - 10000) % 3 == 1 && req.isRelay) {
+ try {
+ Thread.sleep(dummyDelayMS / 2);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ // forward to other replicas in the group
+ if (req.isRelay) {
+ long relayStart = System.nanoTime();
+ req.isRelay = false;
+ TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
- regionReplicaSet.getDataNodeLocations().stream().parallel().forEach(dataNodeLocation -> {
- TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
- if (!otherPoint.equals(tEndpoint)) {
- try {
- handleTsFilePieceNode(req, otherPoint);
- } catch (TException | IOException e) {
- throw new RuntimeException(e);
- }
- }
- });
++ regionReplicaSet.getDataNodeLocations().stream()
++ .parallel()
++ .forEach(
++ dataNodeLocation -> {
++ TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
++ if (!otherPoint.equals(tEndpoint)) {
++ try {
++ handleTsFilePieceNode(req, otherPoint);
++ } catch (TException | IOException e) {
++ throw new RuntimeException(e);
++ }
++ }
++ });
+ relayTime.addAndGet(System.nanoTime() - relayStart);
+ }
+
+ sumHandleTime.addAndGet(System.nanoTime() - handleStart);
+ return new TLoadResp()
+ .setAccepted(true)
+ .setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
+ public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint tEndpoint) {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+ phaseTwoResults
+ .computeIfAbsent(
+ tEndpoint,
+ e -> new ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
+ .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
+ .computeIfAbsent(req.uuid, id -> req.commandType);
+
+ // forward to other replicas in the group
+ if (req.useConsensus) {
+ req.useConsensus = false;
+ TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+ for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+ TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
+ if (!otherPoint.equals(tEndpoint)) {
+ handleTsLoadCommand(req, otherPoint);
+ }
+ }
+ }
+
+ return new TLoadResp()
+ .setAccepted(true)
+ .setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
+ public void printPhaseResult() {
+ System.out.print("Phase one:\n");
+ for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>>>
+ tEndPointMapEntry : phaseOneResults.entrySet()) {
+ TEndPoint endPoint = tEndPointMapEntry.getKey();
+ for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>> consensusGroupIdMapEntry :
+ tEndPointMapEntry.getValue().entrySet()) {
+ ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
+ for (Entry<String, Map<File, Set<Integer>>> stringMapEntry :
+ consensusGroupIdMapEntry.getValue().entrySet()) {
+ String uuid = stringMapEntry.getKey();
+ for (Entry<File, Set<Integer>> fileListEntry : stringMapEntry.getValue().entrySet()) {
+ File tsFile = fileListEntry.getKey();
+ Set<Integer> chunks = fileListEntry.getValue();
+ System.out.printf(
+ "%s - %s - %s - %s - %s chunks\n",
+ endPoint, consensusGroupId, uuid, tsFile, chunks.size());
+ // if (consensusGroupId.getId() == 0) {
+ // // d1, non-aligned series
+ // assertEquals(expectedChunkNum() / 2, chunks.size());
+ // } else {
+ // // d2, aligned series
+ // assertEquals(expectedChunkNum() / 2 / seriesNum, chunks.size());
+ // }
+ }
+ }
+ }
+ }
+
+ System.out.print("Phase two:\n");
+ for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>> tEndPointMapEntry :
+ phaseTwoResults.entrySet()) {
+ TEndPoint endPoint = tEndPointMapEntry.getKey();
+ for (Entry<ConsensusGroupId, Map<String, Integer>> consensusGroupIdMapEntry :
+ tEndPointMapEntry.getValue().entrySet()) {
+ ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
+ for (Entry<String, Integer> stringMapEntry :
+ consensusGroupIdMapEntry.getValue().entrySet()) {
+ String uuid = stringMapEntry.getKey();
+ int command = stringMapEntry.getValue();
+ System.out.printf(
+ "%s - %s - %s - %s\n",
+ endPoint, consensusGroupId, uuid, LoadCommand.values()[command]);
+ assertEquals(LoadCommand.EXECUTE.ordinal(), command);
+ }
+ }
+ }
+ }
+}
diff --cc iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
index 00000000000,630b60ce20a..3b24e32baf0
mode 000000,100644..100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
@@@ -1,0 -1,79 +1,83 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.iotdb.commons.utils;
+
+ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+ import org.apache.iotdb.commons.conf.CommonDescriptor;
+ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+ public class TimePartitionUtils {
+
+ /** Time range for dividing database, the time unit is the same with IoTDB's TimestampPrecision */
+ private static long timePartitionInterval =
+ CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
+
+ public static TTimePartitionSlot getTimePartitionSlot(long time) {
++ return getTimePartitionSlot(time, timePartitionInterval);
++ }
++
++ public static TTimePartitionSlot getTimePartitionSlot(long time, long timePartitionInterval) {
+ TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
+ if (time > 0 || time % timePartitionInterval == 0) {
+ timePartitionSlot.setStartTime(time / timePartitionInterval * timePartitionInterval);
+ } else {
+ timePartitionSlot.setStartTime((time / timePartitionInterval - 1) * timePartitionInterval);
+ }
+ return timePartitionSlot;
+ }
+
+ public static long getTimePartitionInterval() {
+ return timePartitionInterval;
+ }
+
+ public static long getTimePartitionUpperBound(long time) {
+ long upperBoundOfTimePartition;
+ if (time > 0 || time % TimePartitionUtils.timePartitionInterval == 0) {
+ upperBoundOfTimePartition =
+ (time / TimePartitionUtils.timePartitionInterval + 1)
+ * TimePartitionUtils.timePartitionInterval;
+ } else {
+ upperBoundOfTimePartition =
+ (time / TimePartitionUtils.timePartitionInterval)
+ * TimePartitionUtils.timePartitionInterval;
+ }
+ return upperBoundOfTimePartition;
+ }
+
+ public static long getTimePartitionId(long time) {
+ return time > 0 || time % timePartitionInterval == 0
+ ? time / timePartitionInterval
+ : time / timePartitionInterval - 1;
+ }
+
+ public static boolean satisfyPartitionId(long startTime, long endTime, long partitionId) {
+ return getTimePartitionId(startTime) <= partitionId
+ && getTimePartitionId(endTime) >= partitionId;
+ }
+
+ public static boolean satisfyPartitionStartTime(Filter timeFilter, long partitionStartTime) {
+ return timeFilter == null
+ || timeFilter.satisfyStartEndTime(
+ partitionStartTime, partitionStartTime + timePartitionInterval);
+ }
+
+ public static void setTimePartitionInterval(long timePartitionInterval) {
+ TimePartitionUtils.timePartitionInterval = timePartitionInterval;
+ }
+ }