You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/04/27 11:31:41 UTC

[iotdb] branch ca updated (3e58e30f49 -> b262743d23)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch ca
in repository https://gitbox.apache.org/repos/asf/iotdb.git


 discard 3e58e30f49 reduce rpc count for dispatch and thread count for internal service
 discard 18baab5d01 fix prometheus timeout
 discard 6c1556a293 improve iot
     add 104a8bf26f Fix concurrent state change bug in QueryStateMachine
     add 969995276b Combine DataNodeSchemaCache of Template and Non-Template Scenarios  (#9687)
     add ce843c2c06 [IOTDB-5779] PipeConnector reuse strategy based on reference count mechanism (#9629)
     add 4020214423 [IoTDB-5721] Streaming query DataPartition and Schema while loading TsFile (#9684)
     add 29e7e1a851 [IOTDB-5662] Fix BufferedUnderflowException occurs in inner space compaction (#9322)
     add 04a074b4ac [IOTDB-5841] Remove duplicate schema template IT (#9693)
     add e1103a96d6 [IOTDB-5183] Use default snapshot installation path for confignode & schema region
     add 7d4fdc781f [IOTDB-5812] Reduce useless create of PartialPath in auth module (#9691)
     add 809431fd7e Bump yaml from 2.2.1 to 2.2.2 in /site/src/main (#9705)
     add b6f78dcee2 Support update last cache for data insertion when using template (#9696)
     add 8d7ca050cc [IOTDB-5807]Fix decompression error for aligned series in fast compaction (#9701)
     add 86be5ea16a [IOTDB-5803] Improve query performance by reducing cpu consuming
     add c7a6184c01  [IOTDB-5819] Fix npe when booting net metrics (#9698)
     add f77f3fd572 [IOTDB-5704] Change default parameters for wal part (#9394)
     add 0a15a9020f Correct ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL name
     add 33f5f6ec78 [IOTDB-5824] Fix show devices with * cannot display satisfied devices (#9715)
     add 5d77c446ea [IOTDB-5815] Fix Npe when UDF spilling data to disk
     add ed8d539602 [IOTDB-5739] Pipe realtime event process: listener -> assigner -> matcher -> collector (#9479)
     add b5f5d7465c Sort the result of AlignedUpdateLastCacheOperator for LastQueryMergeOperator
     add c2ce85a3c8 Simplify collect redirect Info logic (#9675)
     add 56bcde6d15 [IOTDB-5826] Fix schema query with * cannot display satisfied template series (#9723)
     add 89b2b96a6f Change default multi_dir_strategy to SequenceStrategy and fix original bug (#9718)
     new 518d4eec5e fix prometheus timeout
     new b262743d23 reduce rpc count for dispatch and thread count for internal service

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (3e58e30f49)
            \
             N -- N -- N   refs/heads/ca (b262743d23)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../consensus/request/auth/AuthorPlan.java         |   29 +-
 .../iotdb/confignode/manager/ConfigManager.java    |   13 +-
 .../apache/iotdb/confignode/manager/IManager.java  |    2 +-
 .../confignode/manager/PermissionManager.java      |    3 +-
 .../iotdb/confignode/persistence/AuthorInfo.java   |   34 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   10 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |    8 +-
 .../confignode/persistence/AuthorInfoTest.java     |   30 +-
 .../consensus/iot/IoTConsensusServerImpl.java      |   17 -
 .../consensus/iot/logdispatcher/LogDispatcher.java |   42 +-
 .../iotdb/consensus/ratis/SnapshotStorage.java     |    8 +-
 .../confignode/it/IoTDBClusterAuthorityIT.java     |   88 +-
 .../db/it/last/IoTDBLastQueryLastCache2IT.java     |   60 +
 .../db/it/last/IoTDBLastQueryLastCacheIT.java      |  145 +
 .../db/it/schema/IoTDBAutoCreateSchemaIT.java      |    1 +
 .../it/schema/IoTDBCreateAlignedTimeseriesIT.java  |    1 +
 .../db/it/schema/IoTDBCreateStorageGroupIT.java    |    1 +
 .../db/it/schema/IoTDBCreateTimeseriesIT.java      |    1 +
 .../db/it/schema/IoTDBDeactivateTemplateIT.java    |    1 +
 .../it/schema/IoTDBDeleteAlignedTimeseriesIT.java  |    1 +
 .../db/it/schema/IoTDBDeleteStorageGroupIT.java    |    1 +
 .../db/it/schema/IoTDBDeleteTimeseriesIT.java      |    1 +
 .../iotdb/db/it/schema/IoTDBExtendTemplateIT.java  |    1 +
 .../iotdb/db/it/schema/IoTDBMetadataFetchIT.java   |   50 +
 .../iotdb/db/it/schema/IoTDBSchemaTemplateIT.java  |  115 +-
 .../db/it/schema/IoTDBSortedShowTimeseriesIT.java  |    1 +
 .../apache/iotdb/db/it/schema/IoTDBTagAlterIT.java |    1 +
 .../org/apache/iotdb/db/it/schema/IoTDBTagIT.java  |    1 +
 .../session/it/IoTDBSessionSchemaTemplateIT.java   |   13 +-
 .../{db/it/schema => util}/AbstractSchemaIT.java   |    4 +-
 .../iotdb/metrics/metricsets/net/NetMetrics.java   |   10 +-
 .../resources/conf/iotdb-common.properties         |   19 +-
 .../commons/auth/authorizer/BasicAuthorizer.java   |   35 +-
 .../iotdb/commons/auth/authorizer/IAuthorizer.java |   33 +-
 .../commons/auth/authorizer/OpenIdAuthorizer.java  |    3 +-
 .../iotdb/commons/auth/entity/PathPrivilege.java   |   29 +-
 .../org/apache/iotdb/commons/auth/entity/Role.java |   13 +-
 .../org/apache/iotdb/commons/auth/entity/User.java |   13 +-
 .../iotdb/commons/auth/role/BasicRoleManager.java  |    5 +-
 .../iotdb/commons/auth/role/IRoleManager.java      |    6 +-
 .../iotdb/commons/auth/user/BasicUserManager.java  |    5 +-
 .../iotdb/commons/auth/user/IUserManager.java      |    6 +-
 .../iotdb/commons/concurrent/ThreadName.java       |    2 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |   10 +
 .../iotdb/commons/conf/CommonDescriptor.java       |    8 +
 .../org/apache/iotdb/commons/path/AlignedPath.java |    6 +
 .../org/apache/iotdb/commons/path/PartialPath.java |   40 +
 .../builtin/connector/DoNothingConnector.java      |    6 +-
 .../builtin/processor/DoNothingProcessor.java      |    6 +-
 .../commons/pipe/task/meta/PipeStaticMeta.java     |   24 +-
 .../commons/udf/service/UDFClassLoaderManager.java |    6 +-
 .../org/apache/iotdb/commons/utils/AuthUtils.java  |   96 +-
 .../org/apache/iotdb/commons/utils/IOUtils.java    |    8 +-
 .../apache/iotdb/commons/utils/JVMCommonUtils.java |   12 +-
 .../org/apache/iotdb/pipe/api/PipeCollector.java   |   99 +
 .../org/apache/iotdb/pipe/api/PipeConnector.java   |    6 +-
 .../org/apache/iotdb/pipe/api/PipeProcessor.java   |    6 +-
 .../iotdb/pipe/api/collector/EventCollector.java   |    6 +-
 .../iotdb/pipe/api/collector/RowCollector.java     |    2 +-
 .../PipeCollectorRuntimeConfiguration.java         |   13 +-
 .../org/apache/iotdb/pipe/api/event/Event.java     |    6 +-
 .../org/apache/iotdb/pipe/api/event/EventType.java |    8 +-
 .../event/{ => dml}/deletion/DeletionEvent.java    |    8 +-
 .../{ => dml}/insertion/TabletInsertionEvent.java  |    8 +-
 .../{ => dml}/insertion/TsFileInsertionEvent.java  |    8 +-
 pom.xml                                            |    6 +
 server/pom.xml                                     |    6 +-
 .../resources/conf/iotdb-datanode.properties       |    2 +-
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |   10 +-
 .../apache/iotdb/db/auth/AuthorizerManager.java    |   15 +-
 .../iotdb/db/auth/ClusterAuthorityFetcher.java     |   38 +-
 .../apache/iotdb/db/auth/IAuthorityFetcher.java    |    3 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   16 +-
 .../directories/strategy/SequenceStrategy.java     |    2 +-
 .../performer/impl/FastCompactionPerformer.java    |   11 +-
 .../impl/ReadPointCompactionPerformer.java         |    3 +
 .../execute/utils/MultiTsFileDeviceIterator.java   |    4 +-
 .../fast/AlignedSeriesCompactionExecutor.java      |    3 +-
 .../readchunk/SingleSeriesCompactionExecutor.java  |   38 +-
 .../utils/writer/AbstractCompactionWriter.java     |    7 +-
 .../iotdb/db/engine/storagegroup/DataRegion.java   |   62 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   17 +-
 .../db/metadata/cache/DataNodeSchemaCache.java     |  225 +-
 ...he.java => DeviceUsingTemplateSchemaCache.java} |   45 +-
 ...SchemaCache.java => TimeSeriesSchemaCache.java} |  101 +-
 .../metadata/cache/dualkeycache/IDualKeyCache.java |    4 +
 .../dualkeycache/impl/CacheEntryGroupImpl.java     |    2 +-
 .../cache/dualkeycache/impl/DualKeyCacheImpl.java  |  102 +-
 .../metadata/template/ClusterTemplateManager.java  |   10 +-
 .../apache/iotdb/db/metadata/utils/MetaUtils.java  |   11 +-
 .../org/apache/iotdb/db/mpp/common/QueryId.java    |    2 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |   78 +-
 .../execution/exchange/sink/LocalSinkChannel.java  |   12 +-
 .../mpp/execution/exchange/sink/SinkChannel.java   |   15 +-
 .../execution/exchange/source/SourceHandle.java    |   25 +-
 .../operator/process/TransformOperator.java        |   13 +-
 .../mpp/execution/schedule/DriverTaskThread.java   |    7 +-
 .../iotdb/db/mpp/metric/QueryMetricsManager.java   |  140 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |   19 +
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |   73 +-
 .../db/mpp/plan/analyze/cache/PartitionCache.java  |   10 +-
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |   59 +-
 .../db/mpp/plan/analyze/schema/ISchemaFetcher.java |    2 -
 .../plan/analyze/schema/NormalSchemaFetcher.java   |    5 +-
 .../plan/analyze/schema/TemplateSchemaFetcher.java |   10 +-
 .../db/mpp/plan/execution/QueryExecution.java      |    9 +-
 .../visitor/ColumnTransformerVisitor.java          |    2 +-
 .../visitor/IntermediateLayerVisitor.java          |    4 +-
 .../db/mpp/plan/parser/StatementGenerator.java     |   20 +-
 .../plan/planner/distribution/SourceRewriter.java  |   11 +
 .../plan/node/load/LoadSingleTsFileNode.java       |   59 +-
 .../planner/plan/node/load/LoadTsFileNode.java     |   16 +-
 .../planner/plan/node/write/InsertRowNode.java     |    4 +
 .../planner/plan/node/write/InsertRowsNode.java    |    5 +
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |    7 +
 .../planner/plan/node/write/InsertTabletNode.java  |    8 +
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |   20 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  201 +-
 .../plan/statement/crud/InsertBaseStatement.java   |    4 -
 .../crud/InsertMultiTabletsStatement.java          |   18 -
 .../plan/statement/crud/InsertRowStatement.java    |   17 +-
 .../crud/InsertRowsOfOneDeviceStatement.java       |    9 -
 .../plan/statement/crud/InsertRowsStatement.java   |   17 -
 .../plan/statement/crud/InsertTabletStatement.java |   13 -
 .../dag/builder/EvaluationDAGBuilder.java          |    4 +-
 .../dag/input/QueryDataSetInputLayer.java          |    5 +-
 .../intermediate/ConstantIntermediateLayer.java    |    3 +-
 .../dag/intermediate/IntermediateLayer.java        |    4 +-
 .../MultiInputColumnIntermediateLayer.java         |    2 +-
 ...InputColumnMultiReferenceIntermediateLayer.java |    2 +-
 ...nputColumnSingleReferenceIntermediateLayer.java |    2 +-
 .../db/mpp/transformation/dag/udf/UDTFContext.java |    2 +-
 .../mpp/transformation/dag/udf/UDTFExecutor.java   |    2 +-
 .../datastructure/SerializableList.java            |    6 +-
 .../row/ElasticSerializableRowRecordList.java      |    6 +-
 .../row/SerializableRowRecordList.java             |    2 +-
 .../tv/ElasticSerializableBinaryTVList.java        |    2 +-
 .../tv/ElasticSerializableTVList.java              |    8 +-
 .../datastructure/tv/SerializableTVList.java       |    2 +-
 .../org/apache/iotdb/db/pipe/agent/PipeAgent.java  |    6 +-
 .../db/pipe/agent/plugin/PipePluginAgent.java      |   34 +-
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |   17 +-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |   19 +-
 .../apache/iotdb/db/pipe/config/PipeConfig.java    |   65 +
 .../core/collector/PipeCollectorEventSelector.java |   22 -
 .../collector/realtime/PipeRealtimeCollector.java  |   22 -
 .../realtime/PipeRealtimeDataRegionCollector.java  |   81 +
 .../PipeRealtimeHybridDataRegionCollector.java     |  171 ++
 .../realtime/assigner/DisruptorQueue.java          |  111 +
 .../realtime/assigner/PipeDataRegionAssigner.java  |   74 +
 .../realtime/cache/PipeRealtimeEventCache.java     |   22 -
 .../realtime/listener/IoTLogListerner.java         |   22 -
 .../listener/PipeInsertionDataNodeListener.java    |  115 +
 .../realtime/listener/RatisLogListener.java        |   22 -
 .../realtime/listener/SimpleLogListener.java       |   22 -
 .../listener/TsFileGenerationListener.java         |   22 -
 .../matcher/CachedSchemaPatternMatcher.java        |  200 ++
 .../realtime/matcher/PipeDataRegionMatcher.java    |   47 +
 .../pipe/core/collector/realtime/matcher/Rule.java |   22 -
 .../realtime/matcher/RulePrefixMatchTree.java      |   22 -
 .../collector/realtime/recorder/TsFileEpoch.java   |   22 -
 .../realtime/recorder/TsFileEpochRecorder.java     |   22 -
 .../core/connector/PipeConnectorContainer.java     |   22 -
 .../pipe/core/connector/PipeConnectorManager.java  |   22 -
 .../connector/PipeConnectorSubtaskLifeCycle.java   |   98 +
 .../connector/PipeConnectorSubtaskManager.java     |  107 +
 .../event/{ => impl}/PipeTabletInsertionEvent.java |   22 +-
 .../event/{ => impl}/PipeTsFileInsertionEvent.java |   23 +-
 .../pipe/core/event/indexer/PipeEventIndexer.java  |   22 -
 .../core/event/indexer/PipeIoTEventIndexer.java    |   22 -
 .../core/event/indexer/PipeRatisEventIndexer.java  |   22 -
 .../core/event/indexer/PipeSimpleEventIndexer.java |   22 -
 .../core/event/indexer/PipeTsFileEventIndexer.java |   22 -
 .../event/realtime/PipeRealtimeCollectEvent.java   |   66 +
 .../realtime/PipeRealtimeCollectEventFactory.java  |   49 +
 .../db/pipe/core/event/realtime/TsFileEpoch.java   |   68 +
 .../core/event/realtime/TsFileEpochManager.java    |   70 +
 .../realtime/TsFileEpochStateMigrator.java}        |    7 +-
 .../pipe/core/event/{ => view}/access/PipeRow.java |    2 +-
 .../event/{ => view}/access/PipeRowIterator.java   |    2 +-
 .../{ => view}/collector/PipeEventCollector.java   |    8 +-
 .../{ => view}/collector/PipeRowCollector.java     |    2 +-
 .../execution/executor/PipeSubtaskExecutor.java    |   12 +-
 .../PipeResourceManager.java}                      |   28 +-
 .../pipe/task/callable/PipeConnectorSubtask.java   |   37 -
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |   41 +-
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  109 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |   38 +-
 .../iotdb/db/pipe/task/stage/PipeTaskStage.java    |   34 +-
 .../task/{callable => subtask}/DecoratingLock.java |    2 +-
 .../{callable => subtask}/PipeAssignerSubtask.java |    7 +-
 .../subtask/PipeConnectorSubtask.java}             |   50 +-
 .../subtask/PipeProcessorSubtask.java}             |   44 +-
 .../task/{callable => subtask}/PipeSubtask.java    |    8 +-
 .../iotdb/db/query/context/QueryContext.java       |    2 +
 .../db/query/control/QueryResourceManager.java     |    7 +-
 .../query/control/clientsession/ClientSession.java |    3 +-
 .../db/service/TemporaryQueryDataFileService.java  |   14 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   15 +-
 .../iotdb/db/auth/AuthorizerManagerTest.java       |   26 +-
 .../auth/authorizer/LocalFileAuthorizerTest.java   |    9 +-
 .../iotdb/db/auth/entity/PathPrivilegeTest.java    |    8 +-
 .../org/apache/iotdb/db/auth/entity/RoleTest.java  |    8 +-
 .../org/apache/iotdb/db/auth/entity/UserTest.java  |    8 +-
 .../db/auth/role/LocalFileRoleAccessorTest.java    |    6 +-
 .../db/auth/role/LocalFileRoleManagerTest.java     |    8 +-
 .../db/auth/user/LocalFileUserAccessorTest.java    |    6 +-
 .../db/auth/user/LocalFileUserManagerTest.java     |    8 +-
 .../engine/compaction/AbstractCompactionTest.java  |    3 +
 .../compaction/FastAlignedCrossCompactionTest.java |  751 +++--
 .../FastNonAlignedCrossCompactionTest.java         |  767 ++++--
 .../compaction/ReadChunkInnerCompactionTest.java   |  377 +++
 ...va => ReadPointAlignedCrossCompactionTest.java} |  804 ++++--
 ...=> ReadPointNonAlignedCrossCompactionTest.java} |  820 ++++--
 .../ReadChunkCompactionPerformerNoAlignedTest.java |  274 ++
 .../utils/CompactionFileGeneratorUtils.java        |   60 +
 .../compaction/utils/TsFileGeneratorUtils.java     |  116 +-
 .../db/metadata/cache/DataNodeSchemaCacheTest.java |   50 +
 .../db/mpp/execution/QueryStateMachineTest.java    |    2 +
 .../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java |    3 -
 .../iotdb/db/mpp/plan/plan/distribution/Util.java  |    3 -
 .../plan/node/write/WritePlanNodeSplitTest.java    |   15 +-
 .../collector/CachedSchemaPatternMatcherTest.java  |  149 +
 .../core/collector/PipeRealtimeCollectTest.java    |  236 ++
 .../executor/PipeAssignerSubtaskExecutorTest.java  |    2 +-
 .../executor/PipeConnectorSubtaskExecutorTest.java |    6 +-
 .../executor/PipeProcessorSubtaskExecutorTest.java |   12 +-
 .../executor/PipeSubtaskExecutorTest.java          |    2 +-
 .../AlignedSeriesScanLimitOffsetPushDownTest.java  |   36 +-
 .../series/SeriesScanLimitOffsetPushDownTest.java  |   23 +-
 .../udf/datastructure/SerializableListTest.java    |    2 +-
 .../security/encrypt/MessageDigestEncryptTest.java |    6 +-
 server/src/test/resources/logback-test.xml         |    1 +
 site/src/main/package-lock.json                    | 2881 +++++++-------------
 .../src/main/thrift/confignode.thrift              |    4 +-
 thrift/src/main/thrift/client.thrift               |    3 +
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |    2 +-
 .../read/reader/chunk/AlignedChunkReader.java      |   40 +-
 .../tsfile/read/reader/chunk/ChunkReader.java      |    4 +
 .../tsfile/write/chunk/AlignedChunkWriterImpl.java |   42 +-
 240 files changed, 8359 insertions(+), 4498 deletions(-)
 create mode 100644 integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCache2IT.java
 create mode 100644 integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
 rename integration-test/src/test/java/org/apache/iotdb/{db/it/schema => util}/AbstractSchemaIT.java (98%)
 create mode 100644 pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
 copy server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java => pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/collector/PipeCollectorRuntimeConfiguration.java (70%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java => pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/EventType.java (87%)
 rename pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/{ => dml}/deletion/DeletionEvent.java (87%)
 rename pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/{ => dml}/insertion/TabletInsertionEvent.java (91%)
 rename pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/{ => dml}/insertion/TsFileInsertionEvent.java (89%)
 rename server/src/main/java/org/apache/iotdb/db/metadata/cache/{DataNodeTemplateSchemaCache.java => DeviceUsingTemplateSchemaCache.java} (81%)
 copy server/src/main/java/org/apache/iotdb/db/metadata/cache/{DataNodeSchemaCache.java => TimeSeriesSchemaCache.java} (79%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeHybridDataRegionCollector.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
 rename server/src/main/java/org/apache/iotdb/db/pipe/core/event/{ => impl}/PipeTabletInsertionEvent.java (68%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/core/event/{ => impl}/PipeTsFileInsertionEvent.java (65%)
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpoch.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
 rename server/src/main/java/org/apache/iotdb/db/pipe/core/{collector/PipeCollectorEventPendingQueue.java => event/realtime/TsFileEpochStateMigrator.java} (81%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/core/event/{ => view}/access/PipeRow.java (97%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/core/event/{ => view}/access/PipeRowIterator.java (96%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/core/event/{ => view}/collector/PipeEventCollector.java (82%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/core/event/{ => view}/collector/PipeRowCollector.java (94%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/{task/callable/PipeProcessorSubtask.java => resource/PipeResourceManager.java} (59%)
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/task/callable/PipeConnectorSubtask.java
 rename server/src/main/java/org/apache/iotdb/db/pipe/task/{callable => subtask}/DecoratingLock.java (96%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/task/{callable => subtask}/PipeAssignerSubtask.java (91%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/{core/connector/PipeConnectorPluginRuntimeWrapper.java => task/subtask/PipeConnectorSubtask.java} (58%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/{core/processor/PipeProcessorPluginRuntimeWrapper.java => task/subtask/PipeProcessorSubtask.java} (65%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/task/{callable => subtask}/PipeSubtask.java (95%)
 create mode 100644 server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadChunkInnerCompactionTest.java
 copy server/src/test/java/org/apache/iotdb/db/engine/compaction/{FastAlignedCrossCompactionTest.java => ReadPointAlignedCrossCompactionTest.java} (86%)
 copy server/src/test/java/org/apache/iotdb/db/engine/compaction/{FastNonAlignedCrossCompactionTest.java => ReadPointNonAlignedCrossCompactionTest.java} (85%)
 create mode 100644 server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java


[iotdb] 01/02: fix prometheus timeout

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch ca
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 518d4eec5e757af11d876c1a133deaf9d034de41
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Mon Apr 24 00:23:54 2023 +0800

    fix prometheus timeout
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../metricsets/net/LinuxNetMetricManager.java      | 30 ++++++++++------------
 .../iotdb/db/service/metrics/FileMetrics.java      |  2 +-
 2 files changed, 15 insertions(+), 17 deletions(-)

diff --git a/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/LinuxNetMetricManager.java b/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/LinuxNetMetricManager.java
index 098463dea4..643954c49a 100644
--- a/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/LinuxNetMetricManager.java
+++ b/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/LinuxNetMetricManager.java
@@ -25,11 +25,9 @@ import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -216,19 +214,19 @@ public class LinuxNetMetricManager implements INetMetricManager {
     }
 
     // update socket num
-    try {
-      Process process = Runtime.getRuntime().exec(this.getConnectNumCmd);
-      StringBuilder result = new StringBuilder();
-      try (BufferedReader input =
-          new BufferedReader(new InputStreamReader(process.getInputStream()))) {
-        String line;
-        while ((line = input.readLine()) != null) {
-          result.append(line);
-        }
-      }
-      this.connectionNum = Integer.parseInt(result.toString().trim());
-    } catch (IOException e) {
-      log.error("Failed to get socket num", e);
-    }
+    //    try {
+    //      Process process = Runtime.getRuntime().exec(this.getConnectNumCmd);
+    //      StringBuilder result = new StringBuilder();
+    //      try (BufferedReader input =
+    //          new BufferedReader(new InputStreamReader(process.getInputStream()))) {
+    //        String line;
+    //        while ((line = input.readLine()) != null) {
+    //          result.append(line);
+    //        }
+    //      }
+    //      this.connectionNum = Integer.parseInt(result.toString().trim());
+    //    } catch (IOException e) {
+    //      log.error("Failed to get socket num", e);
+    //    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
index 483c2b7345..1829ebddd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java
@@ -173,7 +173,7 @@ public class FileMetrics implements IMetricSet {
           };
       metricService.createAutoGauge(
           Metric.FILE_COUNT.toString(),
-          MetricLevel.CORE,
+          MetricLevel.NORMAL,
           this,
           FileMetrics::getOpenFileHandlersNumber,
           Tag.NAME.toString(),


[iotdb] 02/02: reduce rpc count for dispatch and thread count for internal service

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch ca
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b262743d23055cac76f67329b643c9b9786f8a57
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Mon Apr 24 00:59:29 2023 +0800

    reduce rpc count for dispatch and thread count for internal service
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../db/mpp/plan/scheduler/AsyncPlanNodeSender.java | 54 ++++++++++++++--------
 .../plan/scheduler/AsyncSendPlanNodeHandler.java   | 27 ++++++-----
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 20 +++++---
 .../impl/DataNodeInternalRPCServiceImpl.java       | 35 ++++++++------
 .../DataNodeInternalRPCServiceImplTest.java        | 31 ++++++++-----
 thrift/src/main/thrift/datanode.thrift             | 14 ++++--
 6 files changed, 117 insertions(+), 64 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
index 29a5f80f62..0d6406c3f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
@@ -25,15 +25,18 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,38 +46,53 @@ import java.util.concurrent.atomic.AtomicLong;
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 
 public class AsyncPlanNodeSender {
+
   private static final Logger logger = LoggerFactory.getLogger(AsyncPlanNodeSender.class);
   private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
       asyncInternalServiceClientManager;
   private final List<FragmentInstance> instances;
-  private final Map<Integer, TSendPlanNodeResp> instanceId2RespMap;
+
+  private final Map<TEndPoint, Pair<List<Integer>, TSendPlanNodeBatchReq>> batchRequests;
+  private final Map<Integer, TSendPlanNodeSingleResp> instanceId2RespMap;
   private final AtomicLong pendingNumber;
+  private final long startSendTime;
 
   public AsyncPlanNodeSender(
       IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
           asyncInternalServiceClientManager,
       List<FragmentInstance> instances) {
+    this.startSendTime = System.nanoTime();
     this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
     this.instances = instances;
-    this.instanceId2RespMap = new ConcurrentHashMap<>();
-    this.pendingNumber = new AtomicLong(instances.size());
+    this.batchRequests = new HashMap<>(instances.size());
+    for (int i = 0; i < instances.size(); i++) {
+      Pair<List<Integer>, TSendPlanNodeBatchReq> value =
+          this.batchRequests.computeIfAbsent(
+              instances.get(i).getHostDataNode().getInternalEndPoint(),
+              x -> new Pair<>(new ArrayList<>(), new TSendPlanNodeBatchReq()));
+      value.getLeft().add(i);
+      value
+          .getRight()
+          .addToRequests(
+              new TSendPlanNodeSingleReq(
+                  new TPlanNode(
+                      instances.get(i).getFragment().getPlanNodeTree().serializeToByteBuffer()),
+                  instances.get(i).getRegionReplicaSet().getRegionId()));
+    }
+    this.instanceId2RespMap = new ConcurrentHashMap<>(instances.size());
+    this.pendingNumber = new AtomicLong(batchRequests.keySet().size());
   }
 
   public void sendAll() {
-    long startSendTime = System.nanoTime();
-    for (int i = 0; i < instances.size(); ++i) {
-      FragmentInstance instance = instances.get(i);
+    for (Map.Entry<TEndPoint, Pair<List<Integer>, TSendPlanNodeBatchReq>> entry :
+        batchRequests.entrySet()) {
       AsyncSendPlanNodeHandler handler =
-          new AsyncSendPlanNodeHandler(i, pendingNumber, instanceId2RespMap, startSendTime);
+          new AsyncSendPlanNodeHandler(
+              entry.getValue().getLeft(), pendingNumber, instanceId2RespMap, startSendTime);
       try {
-        TSendPlanNodeReq sendPlanNodeReq =
-            new TSendPlanNodeReq(
-                new TPlanNode(instance.getFragment().getPlanNodeTree().serializeToByteBuffer()),
-                instance.getRegionReplicaSet().getRegionId());
         AsyncDataNodeInternalServiceClient client =
-            asyncInternalServiceClientManager.borrowClient(
-                instance.getHostDataNode().getInternalEndPoint());
-        client.sendPlanNode(sendPlanNodeReq, handler);
+            asyncInternalServiceClientManager.borrowClient(entry.getKey());
+        client.sendPlanNode(entry.getValue().getRight(), handler);
       } catch (Exception e) {
         handler.onError(e);
       }
@@ -92,7 +110,7 @@ public class AsyncPlanNodeSender {
   public List<TSStatus> getFailureStatusList() {
     List<TSStatus> failureStatusList = new ArrayList<>();
     TSStatus status;
-    for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
+    for (Map.Entry<Integer, TSendPlanNodeSingleResp> entry : instanceId2RespMap.entrySet()) {
       status = entry.getValue().getStatus();
       if (!entry.getValue().accepted) {
         if (status == null) {
@@ -122,7 +140,7 @@ public class AsyncPlanNodeSender {
   }
 
   public Future<FragInstanceDispatchResult> getResult() {
-    for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
+    for (Map.Entry<Integer, TSendPlanNodeSingleResp> entry : instanceId2RespMap.entrySet()) {
       if (!entry.getValue().accepted) {
         logger.warn(
             "dispatch write failed. status: {}, code: {}, message: {}, node {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
index 2bd50a6c7e..eab4bb5ea6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
@@ -19,37 +19,42 @@
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
 import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendPlanNodeResp> {
-  private final int instanceId;
+public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendPlanNodeBatchResp> {
+
+  private final List<Integer> instanceIds;
   private final AtomicLong pendingNumber;
-  private final Map<Integer, TSendPlanNodeResp> instanceId2RespMap;
+  private final Map<Integer, TSendPlanNodeSingleResp> instanceId2RespMap;
   private final long sendTime;
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
       PerformanceOverviewMetrics.getInstance();
 
   public AsyncSendPlanNodeHandler(
-      int instanceId,
+      List<Integer> instanceIds,
       AtomicLong pendingNumber,
-      Map<Integer, TSendPlanNodeResp> instanceId2RespMap,
+      Map<Integer, TSendPlanNodeSingleResp> instanceId2RespMap,
       long sendTime) {
-    this.instanceId = instanceId;
+    this.instanceIds = instanceIds;
     this.pendingNumber = pendingNumber;
     this.instanceId2RespMap = instanceId2RespMap;
     this.sendTime = sendTime;
   }
 
   @Override
-  public void onComplete(TSendPlanNodeResp tSendPlanNodeResp) {
-    instanceId2RespMap.put(instanceId, tSendPlanNodeResp);
+  public void onComplete(TSendPlanNodeBatchResp tSendPlanNodeResp) {
+    for (int i = 0; i < tSendPlanNodeResp.getResponses().size(); i++) {
+      instanceId2RespMap.put(instanceIds.get(i), tSendPlanNodeResp.getResponses().get(i));
+    }
     if (pendingNumber.decrementAndGet() == 0) {
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleRemoteCost(System.nanoTime() - sendTime);
       synchronized (pendingNumber) {
@@ -60,13 +65,13 @@ public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendPlanNo
 
   @Override
   public void onError(Exception e) {
-    TSendPlanNodeResp resp = new TSendPlanNodeResp();
+    TSendPlanNodeSingleResp resp = new TSendPlanNodeSingleResp();
     String errorMsg = String.format("Fail to send plan node, exception message: %s", e);
     resp.setAccepted(false);
     resp.setMessage(errorMsg);
     resp.setStatus(
         RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg));
-    instanceId2RespMap.put(instanceId, resp);
+    instanceIds.forEach(instanceId -> instanceId2RespMap.put(instanceId, resp));
     if (pendingNumber.decrementAndGet() == 0) {
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleRemoteCost(System.nanoTime() - sendTime);
       synchronized (pendingNumber) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a471e9256a..9ba7e1d421 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -42,8 +42,9 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -52,6 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -268,11 +270,15 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
           }
           break;
         case WRITE:
-          TSendPlanNodeReq sendPlanNodeReq =
-              new TSendPlanNodeReq(
-                  new TPlanNode(instance.getFragment().getPlanNodeTree().serializeToByteBuffer()),
-                  instance.getRegionReplicaSet().getRegionId());
-          TSendPlanNodeResp sendPlanNodeResp = client.sendPlanNode(sendPlanNodeReq);
+          TSendPlanNodeBatchReq sendPlanNodeReq =
+              new TSendPlanNodeBatchReq(
+                  Collections.singletonList(
+                      new TSendPlanNodeSingleReq(
+                          new TPlanNode(
+                              instance.getFragment().getPlanNodeTree().serializeToByteBuffer()),
+                          instance.getRegionReplicaSet().getRegionId())));
+          TSendPlanNodeSingleResp sendPlanNodeResp =
+              client.sendPlanNode(sendPlanNodeReq).getResponses().get(0);
           if (!sendPlanNodeResp.accepted) {
             logger.warn(
                 "dispatch write failed. status: {}, code: {}, message: {}, node {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 7f45d9b5e9..7bd979506d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -175,8 +175,9 @@ import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleResp;
 import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
@@ -289,18 +290,24 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   }
 
   @Override
-  public TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req) {
-    LOGGER.debug("receive PlanNode to group[{}]", req.getConsensusGroupId());
-    ConsensusGroupId groupId =
-        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
-    PlanNode planNode = PlanNodeType.deserialize(req.planNode.body);
-    RegionWriteExecutor executor = new RegionWriteExecutor();
-    TSendPlanNodeResp resp = new TSendPlanNodeResp();
-    RegionExecutionResult executionResult = executor.execute(groupId, planNode);
-    resp.setAccepted(executionResult.isAccepted());
-    resp.setMessage(executionResult.getMessage());
-    resp.setStatus(executionResult.getStatus());
-    return resp;
+  public TSendPlanNodeBatchResp sendPlanNode(TSendPlanNodeBatchReq req) {
+    TSendPlanNodeBatchResp responses = new TSendPlanNodeBatchResp();
+    req.getRequests()
+        .forEach(
+            request -> {
+              ConsensusGroupId groupId =
+                  ConsensusGroupId.Factory.createFromTConsensusGroupId(
+                      request.getConsensusGroupId());
+              PlanNode planNode = PlanNodeType.deserialize(request.planNode.body);
+              RegionWriteExecutor executor = new RegionWriteExecutor();
+              TSendPlanNodeSingleResp resp = new TSendPlanNodeSingleResp();
+              RegionExecutionResult executionResult = executor.execute(groupId, planNode);
+              resp.setAccepted(executionResult.isAccepted());
+              resp.setMessage(executionResult.getMessage());
+              resp.setStatus(executionResult.getStatus());
+              responses.addToResponses(resp);
+            });
+    return responses;
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index c06fcf6007..43e9cc680b 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -43,8 +43,9 @@ import org.apache.iotdb.db.service.thrift.impl.DataNodeInternalRPCServiceImpl;
 import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleReq;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -61,11 +62,13 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class DataNodeInternalRPCServiceImplTest {
+
   private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
   DataNodeInternalRPCServiceImpl dataNodeInternalRPCServiceImpl;
   private static final int dataNodeId = 0;
@@ -145,16 +148,18 @@ public class DataNodeInternalRPCServiceImplTest {
     ByteBuffer byteBuffer = createTimeSeriesNode.serializeToByteBuffer();
 
     // put serialized planNode to TSendPlanNodeReq
-    TSendPlanNodeReq request = new TSendPlanNodeReq();
+    TSendPlanNodeSingleReq request = new TSendPlanNodeSingleReq();
     TPlanNode tPlanNode = new TPlanNode();
     tPlanNode.setBody(byteBuffer);
     request.setPlanNode(tPlanNode);
     request.setConsensusGroupId(regionReplicaSet.getRegionId());
 
     // Use consensus layer to execute request
-    TSendPlanNodeResp response = dataNodeInternalRPCServiceImpl.sendPlanNode(request);
+    TSendPlanNodeBatchResp response =
+        dataNodeInternalRPCServiceImpl.sendPlanNode(
+            new TSendPlanNodeBatchReq(Collections.singletonList(request)));
 
-    Assert.assertTrue(response.accepted);
+    Assert.assertTrue(response.getResponses().get(0).accepted);
   }
 
   @Test
@@ -221,16 +226,18 @@ public class DataNodeInternalRPCServiceImplTest {
     ByteBuffer byteBuffer = createAlignedTimeSeriesNode.serializeToByteBuffer();
 
     // put serialized planNode to TSendPlanNodeReq
-    TSendPlanNodeReq request = new TSendPlanNodeReq();
+    TSendPlanNodeSingleReq request = new TSendPlanNodeSingleReq();
     TPlanNode tPlanNode = new TPlanNode();
     tPlanNode.setBody(byteBuffer);
     request.setPlanNode(tPlanNode);
     request.setConsensusGroupId(regionReplicaSet.getRegionId());
 
     // Use consensus layer to execute request
-    TSendPlanNodeResp response = dataNodeInternalRPCServiceImpl.sendPlanNode(request);
+    TSendPlanNodeBatchResp response =
+        dataNodeInternalRPCServiceImpl.sendPlanNode(
+            new TSendPlanNodeBatchReq(Collections.singletonList(request)));
 
-    Assert.assertTrue(response.accepted);
+    Assert.assertTrue(response.getResponses().get(0).accepted);
   }
 
   @Test
@@ -308,16 +315,18 @@ public class DataNodeInternalRPCServiceImplTest {
     ByteBuffer byteBuffer = createMultiTimeSeriesNode.serializeToByteBuffer();
 
     // put serialized planNode to TSendPlanNodeReq
-    TSendPlanNodeReq request = new TSendPlanNodeReq();
+    TSendPlanNodeSingleReq request = new TSendPlanNodeSingleReq();
     TPlanNode tPlanNode = new TPlanNode();
     tPlanNode.setBody(byteBuffer);
     request.setPlanNode(tPlanNode);
     request.setConsensusGroupId(regionReplicaSet.getRegionId());
 
     // Use consensus layer to execute request
-    TSendPlanNodeResp response = dataNodeInternalRPCServiceImpl.sendPlanNode(request);
+    TSendPlanNodeBatchResp response =
+        dataNodeInternalRPCServiceImpl.sendPlanNode(
+            new TSendPlanNodeBatchReq(Collections.singletonList(request)));
 
-    Assert.assertTrue(response.accepted);
+    Assert.assertTrue(response.getResponses().get(0).accepted);
   }
 
   private TRegionReplicaSet genRegionReplicaSet() {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 73b631487b..c4c40eeed3 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -118,12 +118,20 @@ struct TSendFragmentInstanceResp {
   2: optional string message
 }
 
-struct TSendPlanNodeReq {
+struct TSendPlanNodeBatchReq {
+  1: required list<TSendPlanNodeSingleReq> requests;
+}
+
+struct TSendPlanNodeSingleReq {
   1: required TPlanNode planNode
   2: required common.TConsensusGroupId consensusGroupId
 }
 
-struct TSendPlanNodeResp {
+struct TSendPlanNodeBatchResp {
+  1: required list<TSendPlanNodeSingleResp> responses;
+}
+
+struct TSendPlanNodeSingleResp {
   1: required bool accepted
   2: optional string message
   3: optional common.TSStatus status
@@ -472,7 +480,7 @@ service IDataNodeRPCService {
   /**
   * dispatch PlanNode to remote node for write request in order to save resource
   */
-  TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req);
+  TSendPlanNodeBatchResp sendPlanNode(TSendPlanNodeBatchReq req);
 
   TFragmentInstanceInfoResp fetchFragmentInstanceInfo(TFetchFragmentInstanceInfoReq req);