You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/09 16:55:03 UTC

[iotdb] branch IOTDB-5848 updated (ad0d1fae91 -> 2f29861d7a)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch IOTDB-5848
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from ad0d1fae91 introduce reconnect strategy when connection broken
     add 6118e6cec6 [IOTDB-5837] Fix exception in select-into of ALIGN BY DEVICE query
     add b5d13d1876 Delete abandoned API in document (#9773)
     add 109b15434c Use two params to distinguish fsync delay in the wal sync mode and async mode (#9707)
     add 6160095863 [IOTDB-5841] Modify IoTConsensus default parameters to improve performance in more scenarios (#9771)
     add e39060e1d1 update ratis version to 2.5.1 (#9785)
     new 500e2b854c Merge branch 'master' of github.com:apache/iotdb into IOTDB-5848
     new c353c197a0 mvn spotless:apply
     add 299f33c80c [IOTDB-5821] Pipe: PipeCollector Stage (#9789)
     new 94d13c5a92 Merge branch 'master' of github.com:apache/iotdb into IOTDB-5848
     new 8a92f62f61 introduce PendingQueue
     new d0cdd26413 introduce PendingQueue
     new 240fe85737 dispatch realtime events when processing
     add db8849fce1 [IOTDB-5740] Fix drop database does not delete its template (#9792)
     add f579e3ca0d Support Create and Query Schema of Logical View (#9742)
     add 3b44a55bb2 [Doc] Fix error path of Dockerfile (#9725)
     add 16011faf94 [To rel/1.1] [IOTDB-5844] Fix compaction module getting stuck (#9776) (#9790)
     add dde59e204e Refactor SchemaIT for Acceleration (#9794)
     add 5c20ee49d3 [IOTDB-5831] Fix create region failure after recreate db (#9800)
     new 0420bc0dee Merge branch 'master' of github.com:apache/iotdb into IOTDB-5848
     new 2f29861d7a ListenablePendingQueue

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  22 ++
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   4 +
 client-py/tests/test_dataframe.py                  |   2 +
 .../confignode/manager/ClusterSchemaManager.java   |  13 +-
 consensus/pom.xml                                  |   2 +-
 .../iotdb/consensus/config/IoTConsensusConfig.java |  36 +-
 .../consensus/iot/logdispatcher/LogDispatcher.java |   8 +-
 .../logdispatcher/LogDispatcherThreadMetrics.java  |  22 ++
 .../consensus/iot/logdispatcher/SyncStatus.java    |   2 -
 docs/UserGuide/API/Programming-Java-Native-API.md  |  98 ------
 docs/UserGuide/QuickStart/WayToGetIoTDB.md         |   2 +-
 docs/UserGuide/Reference/Common-Config-Manual.md   |  29 +-
 docs/UserGuide/Reference/DataNode-Config-Manual.md |   4 +-
 .../UserGuide/API/Programming-Java-Native-API.md   |  93 -----
 .../zh/UserGuide/Reference/Common-Config-Manual.md |  29 +-
 .../UserGuide/Reference/DataNode-Config-Manual.md  |   4 +-
 .../apache/iotdb/db/it/query/IoTDBResultSetIT.java |   2 +
 .../db/it/schema/IoTDBAutoCreateSchemaIT.java      |  17 +-
 .../it/schema/IoTDBCreateAlignedTimeseriesIT.java  |  17 +-
 .../db/it/schema/IoTDBCreateStorageGroupIT.java    |  18 +-
 .../db/it/schema/IoTDBCreateTimeseriesIT.java      |  17 +-
 .../db/it/schema/IoTDBDeactivateTemplateIT.java    |  26 +-
 .../it/schema/IoTDBDeleteAlignedTimeseriesIT.java  |  18 +-
 .../db/it/schema/IoTDBDeleteStorageGroupIT.java    |  17 +-
 .../db/it/schema/IoTDBDeleteTimeseriesIT.java      |  18 +-
 .../iotdb/db/it/schema/IoTDBExtendTemplateIT.java  |  49 +--
 .../iotdb/db/it/schema/IoTDBMetadataFetchIT.java   |  63 ++--
 .../iotdb/db/it/schema/IoTDBSchemaTemplateIT.java  |  49 +--
 .../db/it/schema/IoTDBSortedShowTimeseriesIT.java  |  20 +-
 .../apache/iotdb/db/it/schema/IoTDBTagAlterIT.java |  17 +-
 .../org/apache/iotdb/db/it/schema/IoTDBTagIT.java  |  17 +-
 .../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java  |  30 ++
 .../org/apache/iotdb/util/AbstractSchemaIT.java    |  48 ++-
 .../iotdb/zeppelin/it/IoTDBInterpreterIT.java      |  14 +-
 .../resources/conf/iotdb-common.properties         |  15 +-
 .../iotdb/commons/consensus/DataRegionId.java      |   4 -
 .../org/apache/iotdb/commons/path/PartialPath.java |   4 +-
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |   8 +-
 .../plugin/builtin/collector/DefaultCollector.java |  60 ++++
 .../node/common/AbstractMeasurementMNode.java      |   4 +
 .../schema/node/role/IMeasurementMNode.java        |   2 +
 .../commons/schema/node/utils/IMNodeFactory.java   |   4 +
 .../schemaregion/rocksdb/RSchemaRegion.java        |   7 +
 .../rocksdb/mnode/RMeasurementMNode.java           |   5 +
 .../metadata/tagSchemaRegion/TagSchemaRegion.java  |   7 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  40 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  26 +-
 .../CompactionMemoryNotEnoughException.java}       |  20 +-
 .../execute/task/CrossSpaceCompactionTask.java     |   6 +-
 .../apache/iotdb/db/metadata/MetadataConstant.java |   6 +
 .../mnode/config/factory/ConfigMNodeFactory.java   |   7 +
 .../mnode/mem/factory/MemMNodeFactory.java         |  14 +
 ...MeasurementMNode.java => LogicalViewMNode.java} |  30 +-
 .../metadata/mnode/mem/impl/LogicalViewSchema.java | 215 +++++++++++
 .../metadata/mnode/mem/impl/MeasurementMNode.java  |   5 +
 .../metadata/mnode/mem/info/LogicalViewInfo.java   | 161 +++++++++
 .../schemafile/factory/CacheMNodeFactory.java      |   7 +
 .../schemafile/impl/CachedMeasurementMNode.java    |   5 +
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  |  10 +-
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |  75 +++-
 .../mtree/snapshot/MemMTreeSnapshotUtil.java       |  41 ++-
 .../plan/schemaregion/SchemaRegionPlanType.java    |   2 +
 .../plan/schemaregion/SchemaRegionPlanVisitor.java |   5 +
 .../impl/SchemaRegionPlanDeserializer.java         |  24 ++
 .../impl/SchemaRegionPlanSerializer.java           |  27 ++
 .../impl/SchemaRegionPlanTxtSerializer.java        |  21 ++
 .../impl/write/CreateLogicalViewPlanImpl.java      |  75 ++++
 .../schemaregion/result/ShowTimeSeriesResult.java  |  14 +-
 .../schemaregion/write/ICreateLogicalViewPlan.java |  66 ++++
 .../metadata/query/info/ITimeSeriesSchemaInfo.java |   6 +-
 .../metadata/rescon/MemSchemaRegionStatistics.java |   8 +-
 .../db/metadata/schemaregion/ISchemaRegion.java    |   5 +
 .../schemaregion/SchemaRegionMemoryImpl.java       |  40 +++
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |   7 +
 .../metadata/template/ClusterTemplateManager.java  |  36 ++
 .../view/viewExpression/ViewExpression.java        | 318 +++++++++++++++++
 .../view/viewExpression/ViewExpressionType.java    |  76 ++++
 .../binary/BinaryViewExpression.java               | 119 +++++++
 .../binary/arithmetic/AdditionViewExpression.java  |  61 ++++
 .../arithmetic/ArithmeticBinaryViewExpression.java |  52 +++
 .../binary/arithmetic/DivisionViewExpression.java  |  61 ++++
 .../binary/arithmetic/ModuloViewExpression.java    |  61 ++++
 .../arithmetic/MultiplicationViewExpression.java   |  63 ++++
 .../arithmetic/SubtractionViewExpression.java      |  61 ++++
 .../compare/CompareBinaryViewExpression.java       |  52 +++
 .../binary/compare/EqualToViewExpression.java      |  57 +++
 .../binary/compare/GreaterEqualViewExpression.java |  57 +++
 .../binary/compare/GreaterThanViewExpression.java  |  57 +++
 .../binary/compare/LessEqualViewExpression.java    |  57 +++
 .../binary/compare/LessThanViewExpression.java     |  57 +++
 .../binary/compare/NonEqualViewExpression.java     |  57 +++
 .../binary/logic/LogicAndViewExpression.java       |  57 +++
 .../binary/logic/LogicBinaryViewExpression.java    |  52 +++
 .../binary/logic/LogicOrViewExpression.java        |  57 +++
 .../viewExpression/leaf/ConstantViewOperand.java   |  98 ++++++
 .../view/viewExpression/leaf/LeafViewOperand.java} |  27 +-
 .../view/viewExpression/leaf/NullViewOperand.java  |  61 ++++
 .../viewExpression/leaf/TimeSeriesViewOperand.java |  87 +++++
 .../viewExpression/leaf/TimestampViewOperand.java  |  72 ++++
 .../multi/FunctionViewExpression.java              | 198 +++++++++++
 .../ternary/BetweenViewExpression.java             | 111 ++++++
 .../ternary/TernaryViewExpression.java             | 105 ++++++
 .../viewExpression/unary/InViewExpression.java     | 106 ++++++
 .../viewExpression/unary/IsNullViewExpression.java |  89 +++++
 .../viewExpression/unary/LikeViewExpression.java   | 163 +++++++++
 .../unary/LogicNotViewExpression.java              |  61 ++++
 .../unary/NegationViewExpression.java              |  61 ++++
 .../unary/RegularViewExpression.java               | 105 ++++++
 .../viewExpression/unary/UnaryViewExpression.java  |  73 ++++
 .../visitor/GetSourcePathsVisitor.java             |  95 +++++
 .../visitor/TransformToExpressionVisitor.java      | 320 +++++++++++++++++
 .../visitor/ViewExpressionVisitor.java             | 219 ++++++++++++
 .../metadata/visitor/SchemaExecutionVisitor.java   |  22 ++
 .../db/mpp/common/header/ColumnHeaderConstant.java |   6 +-
 .../execution/executor/RegionWriteExecutor.java    |  49 +++
 .../operator/process/DeviceViewIntoOperator.java   |   6 +-
 .../schema/source/TimeSeriesSchemaSource.java      |  14 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  45 +++
 .../visitor/TransformToViewExpressionVisitor.java  | 391 +++++++++++++++++++++
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  86 +++++
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  21 ++
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   6 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   5 +
 .../node/metedata/write/CreateLogicalViewNode.java | 250 +++++++++++++
 .../iotdb/db/mpp/plan/statement/StatementType.java |   2 +
 .../db/mpp/plan/statement/StatementVisitor.java    |   7 +
 .../metadata/CreateLogicalViewStatement.java       | 246 +++++++++++++
 .../db/pipe/agent/plugin/PipePluginAgent.java      |  26 +-
 .../PipeCollectorConstant.java}                    |  20 +-
 .../PipeConnectorConstant.java}                    |  18 +-
 .../PipeProcessorConstant.java}                    |  18 +-
 .../core/collector/IoTDBDataRegionCollector.java   |  85 +++++
 ...> PipeHistoricalDataRegionTsFileCollector.java} |  39 +-
 .../realtime/PipeRealtimeDataRegionCollector.java  |  21 +-
 ... => PipeRealtimeDataRegionHybridCollector.java} |  33 +-
 .../connector/PipeConnectorSubtaskLifeCycle.java   |  31 +-
 .../connector/PipeConnectorSubtaskManager.java     |  19 +-
 .../event/view/collector/PipeEventCollector.java   |  59 +++-
 ...anager.java => PipeSubtaskExecutorManager.java} |  12 +-
 .../execution/scheduler/PipeTaskScheduler.java     |  18 +-
 .../iotdb/db/pipe/task/queue/EventSupplier.java    |  25 +-
 .../ListenableBlockingPendingQueue.java}           |  18 +-
 .../db/pipe/task/queue/ListenablePendingQueue.java | 159 +++++++++
 .../ListenableUnblockingPendingQueue.java}         |  18 +-
 .../PendingQueueEmptyToNotEmptyListener.java}      |  19 +-
 .../PendingQueueFullToNotFullListener.java}        |  19 +-
 .../PendingQueueNotEmptyToEmptyListener.java}      |  19 +-
 .../PendingQueueNotFullToFullListener.java}        |  19 +-
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |  78 ++--
 .../db/pipe/task/stage/PipeTaskConnectorStage.java | 100 +-----
 .../db/pipe/task/stage/PipeTaskProcessorStage.java | 103 +++++-
 .../iotdb/db/pipe/task/stage/PipeTaskStage.java    |  95 ++++-
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |  48 ++-
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |  20 +-
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |  20 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   1 +
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  |   9 +-
 .../metadata/view/ViewExpressionToStringTest.java  | 183 ++++++++++
 .../collector/CachedSchemaPatternMatcherTest.java  |  48 ++-
 .../core/collector/PipeRealtimeCollectTest.java    |  59 +++-
 .../executor/PipeConnectorSubtaskExecutorTest.java |   5 +-
 .../executor/PipeProcessorSubtaskExecutorTest.java |   5 +-
 162 files changed, 7145 insertions(+), 914 deletions(-)
 create mode 100644 node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java
 copy server/src/main/java/org/apache/iotdb/db/{metadata/query/info/ITimeSeriesSchemaInfo.java => engine/compaction/execute/exception/CompactionMemoryNotEnoughException.java} (69%)
 copy server/src/main/java/org/apache/iotdb/db/metadata/mnode/mem/impl/{MeasurementMNode.java => LogicalViewMNode.java} (67%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/mnode/mem/impl/LogicalViewSchema.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/mnode/mem/info/LogicalViewInfo.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/write/CreateLogicalViewPlanImpl.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/write/ICreateLogicalViewPlan.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/ViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/ViewExpressionType.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/BinaryViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/arithmetic/AdditionViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/arithmetic/ArithmeticBinaryViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/arithmetic/DivisionViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/arithmetic/ModuloViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/arithmetic/MultiplicationViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/arithmetic/SubtractionViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/compare/CompareBinaryViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/compare/EqualToViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/compare/GreaterEqualViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/compare/GreaterThanViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/compare/LessEqualViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/compare/LessThanViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/compare/NonEqualViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/logic/LogicAndViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/logic/LogicBinaryViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/binary/logic/LogicOrViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/leaf/ConstantViewOperand.java
 rename server/src/{test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java => main/java/org/apache/iotdb/db/metadata/view/viewExpression/leaf/LeafViewOperand.java} (60%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/leaf/NullViewOperand.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/leaf/TimeSeriesViewOperand.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/leaf/TimestampViewOperand.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/multi/FunctionViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/ternary/BetweenViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/ternary/TernaryViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/unary/InViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/unary/IsNullViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/unary/LikeViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/unary/LogicNotViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/unary/NegationViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/unary/RegularViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/unary/UnaryViewExpression.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/visitor/GetSourcePathsVisitor.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/visitor/TransformToExpressionVisitor.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/view/viewExpression/visitor/ViewExpressionVisitor.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/TransformToViewExpressionVisitor.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateLogicalViewNode.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateLogicalViewStatement.java
 copy server/src/main/java/org/apache/iotdb/db/pipe/{task/subtask/PipeAssignerSubtask.java => config/PipeCollectorConstant.java} (68%)
 copy server/src/main/java/org/apache/iotdb/db/pipe/{task/subtask/PipeAssignerSubtask.java => config/PipeConnectorConstant.java} (73%)
 copy server/src/main/java/org/apache/iotdb/db/pipe/{task/subtask/PipeAssignerSubtask.java => config/PipeProcessorConstant.java} (73%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
 rename server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/{PipeHistoricalTsFileCollector.java => PipeHistoricalDataRegionTsFileCollector.java} (76%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/{PipeRealtimeHybridDataRegionCollector.java => PipeRealtimeDataRegionHybridCollector.java} (85%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/{PipeTaskExecutorManager.java => PipeSubtaskExecutorManager.java} (86%)
 copy node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java => server/src/main/java/org/apache/iotdb/db/pipe/task/queue/EventSupplier.java (65%)
 copy server/src/main/java/org/apache/iotdb/db/pipe/task/{subtask/PipeAssignerSubtask.java => queue/ListenableBlockingPendingQueue.java} (69%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java
 copy server/src/main/java/org/apache/iotdb/db/pipe/task/{subtask/PipeAssignerSubtask.java => queue/ListenableUnblockingPendingQueue.java} (71%)
 copy server/src/main/java/org/apache/iotdb/db/pipe/task/{subtask/PipeAssignerSubtask.java => queue/PendingQueueEmptyToNotEmptyListener.java} (73%)
 copy server/src/main/java/org/apache/iotdb/db/pipe/task/{subtask/PipeAssignerSubtask.java => queue/PendingQueueFullToNotFullListener.java} (73%)
 copy server/src/main/java/org/apache/iotdb/db/pipe/task/{subtask/PipeAssignerSubtask.java => queue/PendingQueueNotEmptyToEmptyListener.java} (73%)
 rename server/src/main/java/org/apache/iotdb/db/pipe/task/{subtask/PipeAssignerSubtask.java => queue/PendingQueueNotFullToFullListener.java} (73%)
 create mode 100644 server/src/test/java/org/apache/iotdb/db/metadata/view/ViewExpressionToStringTest.java


[iotdb] 02/08: mvn spotless:apply

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5848
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c353c197a0adc0fa57415e4625d85037362519f1
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 8 22:43:30 2023 +0800

    mvn spotless:apply
---
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |  4 +--
 .../executor/PipeAssignerSubtaskExecutorTest.java  | 40 ----------------------
 2 files changed, 1 insertion(+), 43 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index c7ab2bd58e..c07f83632c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -43,9 +43,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
   // output
   private final PipeConnector pipeConnector;
 
-  /**
-   * @param taskID connectorAttributeSortedString
-   */
+  /** @param taskID connectorAttributeSortedString */
   public PipeConnectorSubtask(String taskID, PipeConnector pipeConnector) {
     super(taskID);
     // TODO: make the size of the queue size reasonable and configurable
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
deleted file mode 100644
index c3c5aedb86..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.execution.executor;
-
-import org.apache.iotdb.db.pipe.task.subtask.PipeAssignerSubtask;
-
-import org.junit.Before;
-import org.mockito.Mockito;
-
-public class PipeAssignerSubtaskExecutorTest extends PipeSubtaskExecutorTest {
-
-  @Before
-  public void setUp() throws Exception {
-    executor = new PipeAssignerSubtaskExecutor();
-
-    subtask =
-        Mockito.spy(
-            new PipeAssignerSubtask("PipeAssignerSubtaskExecutorTest") {
-              @Override
-              public void executeForAWhile() {}
-            });
-  }
-}


[iotdb] 03/08: Merge branch 'master' of github.com:apache/iotdb into IOTDB-5848

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5848
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 94d13c5a928db21e074f941306d9841a48cff64e
Merge: c353c197a0 299f33c80c
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 8 22:51:14 2023 +0800

    Merge branch 'master' of github.com:apache/iotdb into IOTDB-5848

 .../iotdb/commons/consensus/DataRegionId.java      |  4 --
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |  8 ++-
 .../plugin/builtin/collector/DefaultCollector.java | 60 ++++++++++++++++
 .../db/pipe/agent/plugin/PipePluginAgent.java      |  7 ++
 .../db/pipe/config/PipeCollectorConstant.java      | 20 ++----
 .../core/collector/IoTDBDataRegionCollector.java   | 84 ++++++++++++++++++++++
 ...> PipeHistoricalDataRegionTsFileCollector.java} | 39 +++++-----
 .../realtime/PipeRealtimeDataRegionCollector.java  | 21 +++---
 ... => PipeRealtimeDataRegionHybridCollector.java} | 13 ++--
 .../db/pipe/task/stage/PipeTaskCollectorStage.java | 34 +++++----
 .../collector/CachedSchemaPatternMatcherTest.java  | 48 ++++++++++---
 .../core/collector/PipeRealtimeCollectTest.java    | 58 ++++++++++++---
 12 files changed, 303 insertions(+), 93 deletions(-)


[iotdb] 05/08: introduce PendingQueue

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5848
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d0cdd264132dd99414b603c6610fa65d667753ad
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue May 9 21:56:21 2023 +0800

    introduce PendingQueue
---
 .../db/pipe/agent/plugin/PipePluginAgent.java      | 25 +++++++--
 .../db/pipe/config/PipeCollectorConstant.java      |  3 +-
 ...torConstant.java => PipeConnectorConstant.java} |  7 ++-
 ...torConstant.java => PipeProcessorConstant.java} |  7 ++-
 .../core/collector/IoTDBDataRegionCollector.java   |  5 +-
 .../PipeHistoricalDataRegionTsFileCollector.java   |  4 +-
 .../realtime/PipeRealtimeDataRegionCollector.java  |  4 +-
 .../PipeRealtimeDataRegionHybridCollector.java     | 15 +++---
 .../connector/PipeConnectorSubtaskLifeCycle.java   |  7 +--
 .../connector/PipeConnectorSubtaskManager.java     |  5 +-
 .../binder/EventSupplier.java}                     | 17 +++---
 .../iotdb/db/pipe/task/binder/PendingQueue.java    | 22 ++++----
 .../db/pipe/task/stage/PipeTaskCollectorStage.java | 42 ++++++++++++---
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  9 +---
 .../db/pipe/task/stage/PipeTaskProcessorStage.java | 61 +++++++++++++---------
 .../iotdb/db/pipe/task/stage/PipeTaskStage.java    |  8 ---
 .../db/pipe/task/subtask/PipeProcessorSubtask.java | 12 ++---
 .../collector/CachedSchemaPatternMatcherTest.java  |  6 +--
 .../core/collector/PipeRealtimeCollectTest.java    | 16 +++---
 .../executor/PipeProcessorSubtaskExecutorTest.java |  4 +-
 20 files changed, 164 insertions(+), 115 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index d0b42f6fa2..3474edddf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -19,12 +19,15 @@
 
 package org.apache.iotdb.db.pipe.agent.plugin;
 
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
-import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.PipePlugin;
@@ -191,16 +194,28 @@ public class PipePluginAgent {
   }
 
   public PipeCollector reflectCollector(PipeParameters collectorParameters) {
-    return new IoTDBDataRegionCollector(); // TODO: reflect plugin, use PipeIoTDBCollector as
-    // default collector
+    return (PipeCollector)
+        reflect(
+            collectorParameters.getStringOrDefault(
+                PipeCollectorConstant.COLLECTOR_KEY,
+                BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName()));
   }
 
   public PipeProcessor reflectProcessor(PipeParameters processorParameters) {
-    throw new UnsupportedOperationException("Not supported yet.");
+    return (PipeProcessor)
+        reflect(
+            processorParameters.getStringOrDefault(
+                PipeProcessorConstant.PROCESSOR_KEY,
+                BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName()));
   }
 
   public PipeConnector reflectConnector(PipeParameters connectorParameters) {
-    throw new UnsupportedOperationException("Not supported yet.");
+    if (!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
+      throw new PipeManagementException(
+          "Failed to reflect PipeConnector instance because 'connector' is not specified in the parameters.");
+    }
+    return (PipeConnector)
+        reflect(connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY));
   }
 
   private PipePlugin reflect(String pluginName) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
index b75d2168da..5906de3a49 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
@@ -21,8 +21,9 @@ package org.apache.iotdb.db.pipe.config;
 
 public class PipeCollectorConstant {
 
+  public static final String COLLECTOR_KEY = "collector";
   public static final String PATTERN_PATTERN_KEY = "collector.pattern";
-  public static final String PATTERN_DATA_REGION_KEY = "collector.data-region";
+  public static final String DATA_REGION_KEY = "collector.data-region";
 
   private PipeCollectorConstant() {
     throw new IllegalStateException("Utility class");
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
similarity index 80%
copy from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
index b75d2168da..67a637503b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
@@ -19,12 +19,11 @@
 
 package org.apache.iotdb.db.pipe.config;
 
-public class PipeCollectorConstant {
+public class PipeConnectorConstant {
 
-  public static final String PATTERN_PATTERN_KEY = "collector.pattern";
-  public static final String PATTERN_DATA_REGION_KEY = "collector.data-region";
+  public static final String CONNECTOR_KEY = "connector";
 
-  private PipeCollectorConstant() {
+  private PipeConnectorConstant() {
     throw new IllegalStateException("Utility class");
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
similarity index 80%
copy from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
index b75d2168da..1af34f3ef3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
@@ -19,12 +19,11 @@
 
 package org.apache.iotdb.db.pipe.config;
 
-public class PipeCollectorConstant {
+public class PipeProcessorConstant {
 
-  public static final String PATTERN_PATTERN_KEY = "collector.pattern";
-  public static final String PATTERN_DATA_REGION_KEY = "collector.data-region";
+  public static final String PROCESSOR_KEY = "processor";
 
-  private PipeCollectorConstant() {
+  private PipeProcessorConstant() {
     throw new IllegalStateException("Utility class");
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index 0fc9fdf69e..d5638d67a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.core.collector;
 import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
+import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -38,9 +39,9 @@ public class IoTDBDataRegionCollector implements PipeCollector {
   // TODO: support pattern in historical collector
   private final PipeHistoricalDataRegionTsFileCollector historicalCollector;
 
-  public IoTDBDataRegionCollector() {
+  public IoTDBDataRegionCollector(PendingQueue<Event> collectorPendingQueue) {
     hasBeenStarted = new AtomicBoolean(false);
-    realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
+    realtimeCollector = new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
     historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 9ba2f3f9b7..4559ca8a53 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -43,13 +43,13 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
-    validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
+    validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
   }
 
   @Override
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
-    dataRegionId = parameters.getInt(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
+    dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
index 0825b14023..08e731df13 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -35,14 +35,14 @@ public abstract class PipeRealtimeDataRegionCollector implements PipeCollector {
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_PATTERN_KEY);
-    validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
+    validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
   }
 
   @Override
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
     pattern = parameters.getString(PipeCollectorConstant.PATTERN_PATTERN_KEY);
-    dataRegionId = parameters.getString(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
+    dataRegionId = parameters.getString(PipeCollectorConstant.DATA_REGION_KEY);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index a48654d292..9115d91ef8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -22,13 +22,12 @@ package org.apache.iotdb.db.pipe.core.collector.realtime;
 import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
+import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ArrayBlockingQueue;
-
 // TODO: make this collector as a builtin pipe plugin. register it in BuiltinPipePlugin.
 public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegionCollector {
 
@@ -38,12 +37,10 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
   // TODO: memory control
   // This queue is used to store pending events collected by the method collect(). The method
   // supply() will poll events from this queue and send them to the next pipe plugin.
-  private final ArrayBlockingQueue<PipeRealtimeCollectEvent> pendingQueue;
+  private final PendingQueue<Event> pendingQueue;
 
-  public PipeRealtimeDataRegionHybridCollector() {
-    this.pendingQueue =
-        new ArrayBlockingQueue<>(
-            PipeConfig.getInstance().getRealtimeCollectorPendingQueueCapacity());
+  public PipeRealtimeDataRegionHybridCollector(PendingQueue<Event> pendingQueue) {
+    this.pendingQueue = pendingQueue;
   }
 
   @Override
@@ -103,7 +100,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
 
   @Override
   public Event supply() {
-    PipeRealtimeCollectEvent collectEvent = pendingQueue.poll();
+    PipeRealtimeCollectEvent collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
 
     while (collectEvent != null) {
       Event suppliedEvent;
@@ -124,7 +121,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
         return suppliedEvent;
       }
 
-      collectEvent = pendingQueue.poll();
+      collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
     }
 
     // means the pending queue is empty.
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
index 979cecfbc8..f794a7a7a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
@@ -22,12 +22,13 @@ package org.apache.iotdb.db.pipe.core.connector;
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
+import org.apache.iotdb.pipe.api.event.Event;
 
 public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
   private final PipeConnectorSubtaskExecutor executor;
   private final PipeConnectorSubtask subtask;
-  private final PendingQueue pendingQueue;
+  private final PendingQueue<Event> pendingQueue;
 
   private int runningTaskCount;
   private int aliveTaskCount;
@@ -35,7 +36,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
   public PipeConnectorSubtaskLifeCycle(
       PipeConnectorSubtaskExecutor executor,
       PipeConnectorSubtask subtask,
-      PendingQueue pendingQueue) {
+      PendingQueue<Event> pendingQueue) {
     this.executor = executor;
     this.subtask = subtask;
     this.pendingQueue =
@@ -58,7 +59,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
     return subtask;
   }
 
-  public PendingQueue getPendingQueue() {
+  public PendingQueue<Event> getPendingQueue() {
     return pendingQueue;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
index feef134b3e..feb2797022 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import java.util.HashMap;
@@ -44,7 +45,7 @@ public class PipeConnectorSubtaskManager {
     if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
       final PipeConnector pipeConnector = PipeAgent.plugin().reflectConnector(connectorAttributes);
       // TODO: make pendingQueue size configurable
-      final PendingQueue pendingQueue = new PendingQueue(1024 * 1024);
+      final PendingQueue<Event> pendingQueue = new PendingQueue<>(1024 * 1024);
       final PipeConnectorSubtask pipeConnectorSubtask =
           new PipeConnectorSubtask(attributeSortedString, pendingQueue, pipeConnector);
       final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
@@ -96,7 +97,7 @@ public class PipeConnectorSubtaskManager {
     return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask();
   }
 
-  public PendingQueue getPipeConnectorPendingQueue(String attributeSortedString) {
+  public PendingQueue<Event> getPipeConnectorPendingQueue(String attributeSortedString) {
     if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
       throw new PipeException(
           "Failed to get PendingQueue. No such subtask: " + attributeSortedString);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/EventSupplier.java
similarity index 65%
copy from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/binder/EventSupplier.java
index b75d2168da..01abb027d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/EventSupplier.java
@@ -17,14 +17,17 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.config;
+package org.apache.iotdb.db.pipe.task.binder;
 
-public class PipeCollectorConstant {
+import org.apache.iotdb.pipe.api.event.Event;
 
-  public static final String PATTERN_PATTERN_KEY = "collector.pattern";
-  public static final String PATTERN_DATA_REGION_KEY = "collector.data-region";
+@FunctionalInterface
+public interface EventSupplier {
 
-  private PipeCollectorConstant() {
-    throw new IllegalStateException("Utility class");
-  }
+  /**
+   * @return the event to be supplied. the event may be null if the collector has no more events at
+   *     the moment, but the collector is still running for more events.
+   * @throws Exception if the supplier fails to supply the event.
+   */
+  Event supply() throws Exception;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueue.java
index 8500382bbe..1e123b7390 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueue.java
@@ -27,9 +27,9 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class PendingQueue {
+public class PendingQueue<E extends Event> {
 
-  private final Queue<Event> pendingQueue;
+  private final Queue<E> pendingQueue;
 
   private final Map<String, PendingQueueEmptyToNotEmptyListener> emptyToNotEmptyListeners =
       new ConcurrentHashMap<>();
@@ -47,7 +47,7 @@ public class PendingQueue {
     this.pendingQueue = new ArrayBlockingQueue<>(pendingQueueSize);
   }
 
-  public PendingQueue registerEmptyToNotEmptyListener(
+  public PendingQueue<E> registerEmptyToNotEmptyListener(
       String id, PendingQueueEmptyToNotEmptyListener listener) {
     emptyToNotEmptyListeners.put(id, listener);
     return this;
@@ -63,7 +63,7 @@ public class PendingQueue {
         .forEach(PendingQueueEmptyToNotEmptyListener::onPendingQueueEmptyToNotEmpty);
   }
 
-  public PendingQueue registerNotEmptyToEmptyListener(
+  public PendingQueue<E> registerNotEmptyToEmptyListener(
       String id, PendingQueueNotEmptyToEmptyListener listener) {
     notEmptyToEmptyListeners.put(id, listener);
     return this;
@@ -79,7 +79,7 @@ public class PendingQueue {
         .forEach(PendingQueueNotEmptyToEmptyListener::onPendingQueueNotEmptyToEmpty);
   }
 
-  public PendingQueue registerFullToNotFullListener(
+  public PendingQueue<E> registerFullToNotFullListener(
       String id, PendingQueueFullToNotFullListener listener) {
     fullToNotFullListeners.put(id, listener);
     return this;
@@ -95,7 +95,7 @@ public class PendingQueue {
         .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
   }
 
-  public PendingQueue registerNotFullToFullListener(
+  public PendingQueue<E> registerNotFullToFullListener(
       String id, PendingQueueNotFullToFullListener listener) {
     notFullToFullListeners.put(id, listener);
     return this;
@@ -111,7 +111,7 @@ public class PendingQueue {
         .forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull);
   }
 
-  public boolean offer(Event event) {
+  public boolean offer(E event) {
     final boolean isEmpty = pendingQueue.isEmpty();
     final boolean isAdded = pendingQueue.offer(event);
 
@@ -131,9 +131,9 @@ public class PendingQueue {
     return isAdded;
   }
 
-  public Event poll() {
+  public E poll() {
     final boolean isEmpty = pendingQueue.isEmpty();
-    final Event event = pendingQueue.poll();
+    final E event = pendingQueue.poll();
 
     if (event == null) {
       // we don't use size() == 0 to check whether the listener should be called,
@@ -158,4 +158,8 @@ public class PendingQueue {
   public int size() {
     return pendingQueue.size();
   }
+
+  public void disable() {
+    pendingQueue = null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 2027b1ca75..770614cb6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -19,25 +19,56 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
+import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 public class PipeTaskCollectorStage extends PipeTaskStage {
 
   private final PipeParameters collectorParameters;
 
+  /**
+   * TODO: have a better way to control busy/idle status of PipeTaskCollectorStage.
+   *
+   * <p>Currently, this field is for IoTDBDataRegionCollector only. IoTDBDataRegionCollector uses
+   * collectorPendingQueue as an internal data structure to store realtime events.
+   *
+   * <p>PendingQueue can detect whether the queue is empty or not, and it can notify the
+   * PipeTaskProcessorStage to stop processing data when the queue is empty to avoid unnecessary
+   * processing, and it also can notify the PipeTaskProcessorStage to start processing data when the
+   * queue is not empty.
+   */
+  private PendingQueue<Event> collectorPendingQueue;
+
   private PipeCollector pipeCollector;
 
-  PipeTaskCollectorStage(PipeParameters collectorParameters) {
+  public PipeTaskCollectorStage(String dataRegionId, PipeParameters collectorParameters) {
     this.collectorParameters = collectorParameters;
+    // set data region id to collector parameters, so that collector can get data region id inside
+    // collector
+    collectorParameters.getAttribute().put(PipeCollectorConstant.DATA_REGION_KEY, dataRegionId);
   }
 
   @Override
   public void createSubtask() throws PipeException {
-    this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters);
+    if (collectorParameters
+        .getStringOrDefault(
+            PipeCollectorConstant.COLLECTOR_KEY,
+            BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())
+        .equals(BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())) {
+      collectorPendingQueue =
+          new PendingQueue<>(PipeConfig.getInstance().getRealtimeCollectorPendingQueueCapacity());
+      this.pipeCollector = new IoTDBDataRegionCollector(collectorPendingQueue);
+    } else {
+      this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters);
+    }
   }
 
   @Override
@@ -63,8 +94,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
     }
   }
 
-  @Override
-  public PipeSubtask getSubtask() {
-    throw new UnsupportedOperationException("Collector stage does not have subtask.");
+  public PendingQueue<Event> getCollectorPendingQueue() {
+    return collectorPendingQueue;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 7441128d95..e385b1266f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -22,8 +22,8 @@ package org.apache.iotdb.db.pipe.task.stage;
 import org.apache.iotdb.db.pipe.core.connector.PipeConnectorSubtaskManager;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
-import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 public class PipeTaskConnectorStage extends PipeTaskStage {
@@ -60,12 +60,7 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
     PipeConnectorSubtaskManager.instance().deregister(connectorSubtaskId);
   }
 
-  @Override
-  public PipeSubtask getSubtask() {
-    return PipeConnectorSubtaskManager.instance().getPipeConnectorSubtask(connectorSubtaskId);
-  }
-
-  public PendingQueue getPipeConnectorPendingQueue() {
+  public PendingQueue<Event> getPipeConnectorPendingQueue() {
     return PipeConnectorSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 5c239f1958..8aa112d04d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -24,13 +24,16 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
 import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
+import org.apache.iotdb.db.pipe.task.binder.EventSupplier;
 import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
-import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import javax.annotation.Nullable;
+
 public class PipeTaskProcessorStage extends PipeTaskStage {
 
   protected final PipeProcessorSubtaskExecutor executor =
@@ -38,15 +41,25 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
 
   protected final PipeProcessorSubtask subtask;
 
-  protected final PendingQueue pipeCollectorInputPendingQueue;
-  protected final PendingQueue pipeConnectorOutputPendingQueue;
-
+  protected final PendingQueue<Event> pipeCollectorInputPendingQueue;
+  protected final PendingQueue<Event> pipeConnectorOutputPendingQueue;
+
+  /**
+   * @param pipeName pipe name
+   * @param dataRegionId data region id
+   * @param pipeCollectorInputEventSupplier used to input events from pipe collector
+   * @param pipeCollectorInputPendingQueue used to listen whether pipe collector event queue is from
+   *     empty to not empty or from not empty to empty, null means no need to listen
+   * @param pipeProcessorParameters used to create pipe processor
+   * @param pipeConnectorOutputPendingQueue used to output events to pipe connector
+   */
   protected PipeTaskProcessorStage(
       String pipeName,
       String dataRegionId,
-      PendingQueue pipeCollectorInputPendingQueue,
+      EventSupplier pipeCollectorInputEventSupplier,
+      @Nullable PendingQueue<Event> pipeCollectorInputPendingQueue,
       PipeParameters pipeProcessorParameters,
-      PendingQueue pipeConnectorOutputPendingQueue) {
+      PendingQueue<Event> pipeConnectorOutputPendingQueue) {
     final String taskId = pipeName + "_" + dataRegionId;
     final PipeProcessor pipeProcessor =
         PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
@@ -56,21 +69,22 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     this.subtask =
         new PipeProcessorSubtask(
             taskId,
-            pipeCollectorInputPendingQueue,
+            pipeCollectorInputEventSupplier,
             pipeProcessor,
             pipeConnectorOutputEventCollector);
 
     this.pipeCollectorInputPendingQueue =
-        pipeCollectorInputPendingQueue
-            .registerEmptyToNotEmptyListener(
-                taskId,
-                () -> {
-                  if (status == PipeStatus.RUNNING) {
-                    pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
-                    executor.start(subtask.getTaskID());
-                  }
-                })
-            .registerNotEmptyToEmptyListener(taskId, () -> executor.stop(subtask.getTaskID()));
+        pipeCollectorInputPendingQueue != null
+            ? pipeCollectorInputPendingQueue
+                .registerEmptyToNotEmptyListener(
+                    taskId,
+                    () -> {
+                      if (status == PipeStatus.RUNNING) {
+                        executor.start(subtask.getTaskID());
+                      }
+                    })
+                .registerNotEmptyToEmptyListener(taskId, () -> executor.stop(subtask.getTaskID()))
+            : null;
     this.pipeConnectorOutputPendingQueue =
         pipeConnectorOutputPendingQueue
             .registerNotFullToFullListener(taskId, () -> executor.stop(subtask.getTaskID()))
@@ -104,17 +118,14 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
   public void dropSubtask() throws PipeException {
     final String taskId = subtask.getTaskID();
 
-    pipeCollectorInputPendingQueue.removeEmptyToNotEmptyListener(taskId);
-    pipeCollectorInputPendingQueue.removeNotEmptyToEmptyListener(taskId);
+    if (pipeCollectorInputPendingQueue != null) {
+      pipeCollectorInputPendingQueue.removeEmptyToNotEmptyListener(taskId);
+      pipeCollectorInputPendingQueue.removeNotEmptyToEmptyListener(taskId);
+    }
 
     pipeConnectorOutputPendingQueue.removeNotFullToFullListener(taskId);
     pipeConnectorOutputPendingQueue.removeFullToNotFullListener(taskId);
 
-    executor.deregister(subtask.getTaskID());
-  }
-
-  @Override
-  public PipeSubtask getSubtask() {
-    return subtask;
+    executor.deregister(taskId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
index 1a802f1a15..e3a793c0d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.task.stage;
 
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
-import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 public abstract class PipeTaskStage {
@@ -129,11 +128,4 @@ public abstract class PipeTaskStage {
   }
 
   protected abstract void dropSubtask() throws PipeException;
-
-  /**
-   * Get the pipe subtask.
-   *
-   * @return the pipe subtask.
-   */
-  public abstract PipeSubtask getSubtask();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index f863962e96..96cd6a70f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.binder.EventSupplier;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -35,24 +35,24 @@ public class PipeProcessorSubtask extends PipeSubtask {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeProcessorSubtask.class);
 
-  private final PendingQueue inputPendingQueue;
+  private final EventSupplier inputEventSupplier;
   private final PipeProcessor pipeProcessor;
   private final EventCollector outputEventCollector;
 
   public PipeProcessorSubtask(
       String taskID,
-      PendingQueue inputPendingQueue,
+      EventSupplier inputEventSupplier,
       PipeProcessor pipeProcessor,
       EventCollector outputEventCollector) {
     super(taskID);
-    this.inputPendingQueue = inputPendingQueue;
+    this.inputEventSupplier = inputEventSupplier;
     this.pipeProcessor = pipeProcessor;
     this.outputEventCollector = outputEventCollector;
   }
 
   @Override
-  protected void executeForAWhile() {
-    final Event event = inputPendingQueue.poll();
+  protected void executeForAWhile() throws Exception {
+    final Event event = inputEventSupplier.supply();
     if (event == null) {
       return;
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index c453ee6af3..f376feb49c 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -70,7 +70,7 @@ public class CachedSchemaPatternMatcherTest {
             new HashMap<String, String>() {
               {
                 put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root");
-                put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+                put(PipeCollectorConstant.DATA_REGION_KEY, "1");
               }
             }),
         null);
@@ -86,7 +86,7 @@ public class CachedSchemaPatternMatcherTest {
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI1);
-                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+                  put(PipeCollectorConstant.DATA_REGION_KEY, "1");
                 }
               }),
           null);
@@ -100,7 +100,7 @@ public class CachedSchemaPatternMatcherTest {
                 new HashMap<String, String>() {
                   {
                     put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI + "." + finalJ);
-                    put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+                    put(PipeCollectorConstant.DATA_REGION_KEY, "1");
                   }
                 }),
             null);
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index f218e6865f..46ce856b82 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -83,20 +83,20 @@ public class PipeRealtimeCollectTest {
     // set up realtime collector
 
     try (PipeRealtimeDataRegionHybridCollector collector1 =
-            new PipeRealtimeDataRegionHybridCollector();
+            new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
         PipeRealtimeDataRegionHybridCollector collector2 =
-            new PipeRealtimeDataRegionHybridCollector();
+            new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
         PipeRealtimeDataRegionHybridCollector collector3 =
-            new PipeRealtimeDataRegionHybridCollector();
+            new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
         PipeRealtimeDataRegionHybridCollector collector4 =
-            new PipeRealtimeDataRegionHybridCollector()) {
+            new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue)) {
 
       collector1.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
-                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion1);
+                  put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
                 }
               }),
           null);
@@ -105,7 +105,7 @@ public class PipeRealtimeCollectTest {
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
-                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion1);
+                  put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
                 }
               }),
           null);
@@ -114,7 +114,7 @@ public class PipeRealtimeCollectTest {
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
-                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion2);
+                  put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
                 }
               }),
           null);
@@ -123,7 +123,7 @@ public class PipeRealtimeCollectTest {
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
-                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion2);
+                  put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
                 }
               }),
           null);
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
index e6ded5976c..88997a0ca7 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.binder.EventSupplier;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -39,7 +39,7 @@ public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
         Mockito.spy(
             new PipeProcessorSubtask(
                 "PipeProcessorSubtaskExecutorTest",
-                mock(PendingQueue.class),
+                mock(EventSupplier.class),
                 mock(PipeProcessor.class),
                 mock(EventCollector.class)) {
               @Override


[iotdb] 04/08: introduce PendingQueue

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5848
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8a92f62f6109a6c463183a6bbd863021db9c7db2
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue May 9 18:31:01 2023 +0800

    introduce PendingQueue
---
 .../connector/PipeConnectorSubtaskLifeCycle.java   |  30 +++-
 .../connector/PipeConnectorSubtaskManager.java     |  16 +-
 .../event/view/collector/PipeEventCollector.java   |  59 +++++++-
 ...anager.java => PipeSubtaskExecutorManager.java} |  12 +-
 .../execution/scheduler/PipeTaskScheduler.java     |  18 +--
 .../iotdb/db/pipe/task/binder/PendingQueue.java    | 161 +++++++++++++++++++++
 .../PendingQueueEmptyToNotEmptyListener.java}      |  19 +--
 .../PendingQueueFullToNotFullListener.java}        |  19 +--
 .../PendingQueueNotEmptyToEmptyListener.java}      |  19 +--
 .../PendingQueueNotFullToFullListener.java}        |  19 +--
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |  10 +-
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  97 +++----------
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  79 ++++++++--
 .../iotdb/db/pipe/task/stage/PipeTaskStage.java    |  91 +++++++++++-
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |  42 +++---
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |  14 +-
 .../executor/PipeConnectorSubtaskExecutorTest.java |   5 +-
 .../executor/PipeProcessorSubtaskExecutorTest.java |   5 +-
 18 files changed, 494 insertions(+), 221 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
index 7e71ae2f7a..979cecfbc8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
@@ -20,20 +20,35 @@
 package org.apache.iotdb.db.pipe.core.connector;
 
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 
 public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
   private final PipeConnectorSubtaskExecutor executor;
   private final PipeConnectorSubtask subtask;
+  private final PendingQueue pendingQueue;
 
   private int runningTaskCount;
   private int aliveTaskCount;
 
   public PipeConnectorSubtaskLifeCycle(
-      PipeConnectorSubtaskExecutor executor, PipeConnectorSubtask subtask) {
+      PipeConnectorSubtaskExecutor executor,
+      PipeConnectorSubtask subtask,
+      PendingQueue pendingQueue) {
     this.executor = executor;
     this.subtask = subtask;
+    this.pendingQueue =
+        pendingQueue
+            .registerEmptyToNotEmptyListener(
+                subtask.getTaskID(),
+                () -> {
+                  if (hasRunningTasks()) {
+                    executor.start(subtask.getTaskID());
+                  }
+                })
+            .registerNotEmptyToEmptyListener(
+                subtask.getTaskID(), () -> executor.stop(subtask.getTaskID()));
 
     runningTaskCount = 0;
     aliveTaskCount = 0;
@@ -43,6 +58,10 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
     return subtask;
   }
 
+  public PendingQueue getPendingQueue() {
+    return pendingQueue;
+  }
+
   public synchronized void register() {
     if (aliveTaskCount < 0) {
       throw new IllegalStateException("aliveTaskCount < 0");
@@ -63,7 +82,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
       throw new IllegalStateException("aliveTaskCount <= 0");
     }
     if (aliveTaskCount == 1) {
-      executor.deregister(subtask.getTaskID());
+      close();
       // this subtask is out of life cycle, should never be used again
       return true;
     }
@@ -93,6 +112,13 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
   @Override
   public synchronized void close() {
+    pendingQueue.removeEmptyToNotEmptyListener(subtask.getTaskID());
+    pendingQueue.removeNotEmptyToEmptyListener(subtask.getTaskID());
+
     executor.deregister(subtask.getTaskID());
   }
+
+  private synchronized boolean hasRunningTasks() {
+    return runningTaskCount > 0;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
index a00cedf7b5..feef134b3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.core.connector;
 
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -42,10 +43,12 @@ public class PipeConnectorSubtaskManager {
 
     if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
       final PipeConnector pipeConnector = PipeAgent.plugin().reflectConnector(connectorAttributes);
+      // TODO: make pendingQueue size configurable
+      final PendingQueue pendingQueue = new PendingQueue(1024 * 1024);
       final PipeConnectorSubtask pipeConnectorSubtask =
-          new PipeConnectorSubtask(attributeSortedString, pipeConnector);
+          new PipeConnectorSubtask(attributeSortedString, pendingQueue, pipeConnector);
       final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
-          new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask);
+          new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, pendingQueue);
       attributeSortedString2SubtaskLifeCycleMap.put(
           attributeSortedString, pipeConnectorSubtaskLifeCycle);
     }
@@ -93,6 +96,15 @@ public class PipeConnectorSubtaskManager {
     return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask();
   }
 
+  public PendingQueue getPipeConnectorPendingQueue(String attributeSortedString) {
+    if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
+      throw new PipeException(
+          "Failed to get PendingQueue. No such subtask: " + attributeSortedString);
+    }
+
+    return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getPendingQueue();
+  }
+
   /////////////////////////  Singleton Instance Holder  /////////////////////////
 
   private PipeConnectorSubtaskManager() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 7cc0778193..a9d9232150 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -19,21 +19,72 @@
 
 package org.apache.iotdb.db.pipe.core.event.view.collector;
 
+import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
-import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
 
 public class PipeEventCollector implements EventCollector {
 
+  private final PendingQueue pendingQueue;
+
+  // buffer queue is used to store events that are not offered to pending queue
+  // because the pending queue is full. when pending queue is full, pending queue
+  // will notify tasks to stop collecting events, and buffer queue will be used to store
+  // events before tasks are stopped. when pending queue is not full and tasks are
+  // notified by the pending queue to start collecting events, buffer queue will be used to store
+  // events before events in buffer queue are offered to pending queue.
+  private final Queue<Event> bufferQueue;
+
+  public PipeEventCollector(PendingQueue pendingQueue) {
+    this.pendingQueue = pendingQueue;
+    bufferQueue = new LinkedList<>();
+  }
+
   @Override
-  public void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException {}
+  public void collectTabletInsertionEvent(TabletInsertionEvent event) {
+    collect(event);
+  }
 
   @Override
-  public void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException {}
+  public void collectTsFileInsertionEvent(TsFileInsertionEvent event) {
+    collect(event);
+  }
 
   @Override
-  public void collectDeletionEvent(DeletionEvent event) throws IOException {}
+  public void collectDeletionEvent(DeletionEvent event) {
+    collect(event);
+  }
+
+  private synchronized void collect(Event event) {
+    while (!bufferQueue.isEmpty()) {
+      final Event bufferedEvent = bufferQueue.peek();
+      if (pendingQueue.offer(bufferedEvent)) {
+        bufferQueue.poll();
+      } else {
+        bufferQueue.offer(event);
+        return;
+      }
+    }
+
+    if (!pendingQueue.offer(event)) {
+      bufferQueue.offer(event);
+    }
+  }
+
+  public synchronized void tryCollectBufferedEvents() {
+    while (!bufferQueue.isEmpty()) {
+      final Event bufferedEvent = bufferQueue.peek();
+      if (pendingQueue.offer(bufferedEvent)) {
+        bufferQueue.poll();
+      } else {
+        return;
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutorManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutorManager.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
index 8698a23d86..fc6a335ad0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutorManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
@@ -26,9 +26,9 @@ import org.slf4j.LoggerFactory;
  * PipeTaskExecutor is responsible for executing the pipe tasks, and it is scheduled by the
  * PipeTaskScheduler. It is a singleton class.
  */
-public class PipeTaskExecutorManager {
+public class PipeSubtaskExecutorManager {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskExecutorManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtaskExecutorManager.class);
 
   private final PipeAssignerSubtaskExecutor assignerSubtaskExecutor;
   private final PipeProcessorSubtaskExecutor processorSubtaskExecutor;
@@ -48,19 +48,19 @@ public class PipeTaskExecutorManager {
 
   /////////////////////////  Singleton Instance Holder  /////////////////////////
 
-  private PipeTaskExecutorManager() {
+  private PipeSubtaskExecutorManager() {
     assignerSubtaskExecutor = new PipeAssignerSubtaskExecutor();
     processorSubtaskExecutor = new PipeProcessorSubtaskExecutor();
     connectorSubtaskExecutor = new PipeConnectorSubtaskExecutor();
   }
 
   private static class PipeTaskExecutorHolder {
-    private static PipeTaskExecutorManager instance = null;
+    private static PipeSubtaskExecutorManager instance = null;
   }
 
-  public static PipeTaskExecutorManager setupAndGetInstance() {
+  public static synchronized PipeSubtaskExecutorManager getInstance() {
     if (PipeTaskExecutorHolder.instance == null) {
-      PipeTaskExecutorHolder.instance = new PipeTaskExecutorManager();
+      PipeTaskExecutorHolder.instance = new PipeSubtaskExecutorManager();
     }
     return PipeTaskExecutorHolder.instance;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
index 188bbac0e4..4f035ca671 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.scheduler;
 
-import org.apache.iotdb.db.pipe.execution.executor.PipeTaskExecutorManager;
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
 
 /**
  * PipeTaskScheduler is a singleton class that manages the numbers of threads used by
@@ -27,34 +27,34 @@ import org.apache.iotdb.db.pipe.execution.executor.PipeTaskExecutorManager;
  */
 public class PipeTaskScheduler {
 
-  private final PipeTaskExecutorManager pipeTaskExecutorManager =
-      PipeTaskExecutorManager.setupAndGetInstance();
+  private final PipeSubtaskExecutorManager pipeSubtaskExecutorManager =
+      PipeSubtaskExecutorManager.getInstance();
 
   public void adjustAssignerSubtaskExecutorThreadNum(int threadNum) {
     // TODO: make it configurable by setting different parameters
-    pipeTaskExecutorManager.getAssignerSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
+    pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
   }
 
   public int getAssignerSubtaskExecutorThreadNum() {
-    return pipeTaskExecutorManager.getAssignerSubtaskExecutor().getExecutorThreadNumber();
+    return pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().getExecutorThreadNumber();
   }
 
   public void adjustConnectorSubtaskExecutorThreadNum(int threadNum) {
     // TODO: make it configurable by setting different parameters
-    pipeTaskExecutorManager.getConnectorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
+    pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
   }
 
   public int getConnectorSubtaskExecutorThreadNum() {
-    return pipeTaskExecutorManager.getConnectorSubtaskExecutor().getExecutorThreadNumber();
+    return pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().getExecutorThreadNumber();
   }
 
   public void adjustProcessorSubtaskExecutorThreadNum(int threadNum) {
     // TODO: make it configurable by setting different parameters
-    pipeTaskExecutorManager.getProcessorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
+    pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
   }
 
   public int getProcessorSubtaskExecutorThreadNum() {
-    return pipeTaskExecutorManager.getProcessorSubtaskExecutor().getExecutorThreadNumber();
+    return pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().getExecutorThreadNumber();
   }
 
   /////////////////////////  Singleton Instance Holder  /////////////////////////
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueue.java
new file mode 100644
index 0000000000..8500382bbe
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueue.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.binder;
+
+import org.apache.iotdb.pipe.api.event.Event;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class PendingQueue {
+
+  private final Queue<Event> pendingQueue;
+
+  private final Map<String, PendingQueueEmptyToNotEmptyListener> emptyToNotEmptyListeners =
+      new ConcurrentHashMap<>();
+  private final Map<String, PendingQueueNotEmptyToEmptyListener> notEmptyToEmptyListeners =
+      new ConcurrentHashMap<>();
+  private final Map<String, PendingQueueFullToNotFullListener> fullToNotFullListeners =
+      new ConcurrentHashMap<>();
+  private final Map<String, PendingQueueNotFullToFullListener> notFullToFullListeners =
+      new ConcurrentHashMap<>();
+
+  private final AtomicBoolean isFull = new AtomicBoolean(false);
+
+  public PendingQueue(int pendingQueueSize) {
+    // TODO: make the size of the queue size reasonable and configurable
+    this.pendingQueue = new ArrayBlockingQueue<>(pendingQueueSize);
+  }
+
+  public PendingQueue registerEmptyToNotEmptyListener(
+      String id, PendingQueueEmptyToNotEmptyListener listener) {
+    emptyToNotEmptyListeners.put(id, listener);
+    return this;
+  }
+
+  public void removeEmptyToNotEmptyListener(String id) {
+    emptyToNotEmptyListeners.remove(id);
+  }
+
+  public void notifyEmptyToNotEmptyListeners() {
+    emptyToNotEmptyListeners
+        .values()
+        .forEach(PendingQueueEmptyToNotEmptyListener::onPendingQueueEmptyToNotEmpty);
+  }
+
+  public PendingQueue registerNotEmptyToEmptyListener(
+      String id, PendingQueueNotEmptyToEmptyListener listener) {
+    notEmptyToEmptyListeners.put(id, listener);
+    return this;
+  }
+
+  public void removeNotEmptyToEmptyListener(String id) {
+    notEmptyToEmptyListeners.remove(id);
+  }
+
+  public void notifyNotEmptyToEmptyListeners() {
+    notEmptyToEmptyListeners
+        .values()
+        .forEach(PendingQueueNotEmptyToEmptyListener::onPendingQueueNotEmptyToEmpty);
+  }
+
+  public PendingQueue registerFullToNotFullListener(
+      String id, PendingQueueFullToNotFullListener listener) {
+    fullToNotFullListeners.put(id, listener);
+    return this;
+  }
+
+  public void removeFullToNotFullListener(String id) {
+    fullToNotFullListeners.remove(id);
+  }
+
+  public void notifyFullToNotFullListeners() {
+    fullToNotFullListeners
+        .values()
+        .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
+  }
+
+  public PendingQueue registerNotFullToFullListener(
+      String id, PendingQueueNotFullToFullListener listener) {
+    notFullToFullListeners.put(id, listener);
+    return this;
+  }
+
+  public void removeNotFullToFullListener(String id) {
+    notFullToFullListeners.remove(id);
+  }
+
+  public void notifyNotFullToFullListeners() {
+    notFullToFullListeners
+        .values()
+        .forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull);
+  }
+
+  public boolean offer(Event event) {
+    final boolean isEmpty = pendingQueue.isEmpty();
+    final boolean isAdded = pendingQueue.offer(event);
+
+    if (isAdded) {
+      // we don't use size() == 1 to check whether the listener should be called,
+      // because offer() and size() are not atomic, and we don't want to use lock
+      // to make them atomic.
+      if (isEmpty) {
+        notifyEmptyToNotEmptyListeners();
+      }
+    } else {
+      if (isFull.compareAndSet(false, true)) {
+        notifyNotFullToFullListeners();
+      }
+    }
+
+    return isAdded;
+  }
+
+  public Event poll() {
+    final boolean isEmpty = pendingQueue.isEmpty();
+    final Event event = pendingQueue.poll();
+
+    if (event == null) {
+      // we don't use size() == 0 to check whether the listener should be called,
+      // because poll() and size() are not atomic, and we don't want to use lock
+      // to make them atomic.
+      if (!isEmpty) {
+        notifyNotEmptyToEmptyListeners();
+      }
+    } else {
+      if (isFull.compareAndSet(true, false)) {
+        notifyFullToNotFullListeners();
+      }
+    }
+
+    return event;
+  }
+
+  public void clear() {
+    pendingQueue.clear();
+  }
+
+  public int size() {
+    return pendingQueue.size();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueEmptyToNotEmptyListener.java
similarity index 73%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueEmptyToNotEmptyListener.java
index 39ffaefeeb..6d855cdb46 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueEmptyToNotEmptyListener.java
@@ -17,21 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.subtask;
+package org.apache.iotdb.db.pipe.task.binder;
 
-public class PipeAssignerSubtask extends PipeSubtask {
+@FunctionalInterface
+public interface PendingQueueEmptyToNotEmptyListener {
 
-  public PipeAssignerSubtask(String taskID) {
-    super(taskID);
-  }
-
-  @Override
-  protected void executeForAWhile() {
-    // do nothing
-  }
-
-  @Override
-  public void close() {
-    // TODO
-  }
+  void onPendingQueueEmptyToNotEmpty();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueFullToNotFullListener.java
similarity index 73%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueFullToNotFullListener.java
index 39ffaefeeb..91b6192c2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueFullToNotFullListener.java
@@ -17,21 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.subtask;
+package org.apache.iotdb.db.pipe.task.binder;
 
-public class PipeAssignerSubtask extends PipeSubtask {
+@FunctionalInterface
+public interface PendingQueueFullToNotFullListener {
 
-  public PipeAssignerSubtask(String taskID) {
-    super(taskID);
-  }
-
-  @Override
-  protected void executeForAWhile() {
-    // do nothing
-  }
-
-  @Override
-  public void close() {
-    // TODO
-  }
+  void onPendingQueueFullToNotFull();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotEmptyToEmptyListener.java
similarity index 73%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotEmptyToEmptyListener.java
index 39ffaefeeb..80b89045bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotEmptyToEmptyListener.java
@@ -17,21 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.subtask;
+package org.apache.iotdb.db.pipe.task.binder;
 
-public class PipeAssignerSubtask extends PipeSubtask {
+@FunctionalInterface
+public interface PendingQueueNotEmptyToEmptyListener {
 
-  public PipeAssignerSubtask(String taskID) {
-    super(taskID);
-  }
-
-  @Override
-  protected void executeForAWhile() {
-    // do nothing
-  }
-
-  @Override
-  public void close() {
-    // TODO
-  }
+  void onPendingQueueNotEmptyToEmpty();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
index 39ffaefeeb..fe84f3ca4b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
@@ -17,21 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.subtask;
+package org.apache.iotdb.db.pipe.task.binder;
 
-public class PipeAssignerSubtask extends PipeSubtask {
+@FunctionalInterface
+public interface PendingQueueNotFullToFullListener {
 
-  public PipeAssignerSubtask(String taskID) {
-    super(taskID);
-  }
-
-  @Override
-  protected void executeForAWhile() {
-    // do nothing
-  }
-
-  @Override
-  public void close() {
-    // TODO
-  }
+  void onPendingQueueNotFullToFull();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 51a344d580..2027b1ca75 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public class PipeTaskCollectorStage implements PipeTaskStage {
+public class PipeTaskCollectorStage extends PipeTaskStage {
 
   private final PipeParameters collectorParameters;
 
@@ -36,12 +36,12 @@ public class PipeTaskCollectorStage implements PipeTaskStage {
   }
 
   @Override
-  public void create() throws PipeException {
+  public void createSubtask() throws PipeException {
     this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters);
   }
 
   @Override
-  public void start() throws PipeException {
+  public void startSubtask() throws PipeException {
     try {
       pipeCollector.start();
     } catch (Exception e) {
@@ -50,12 +50,12 @@ public class PipeTaskCollectorStage implements PipeTaskStage {
   }
 
   @Override
-  public void stop() throws PipeException {
+  public void stopSubtask() throws PipeException {
     // collector continuously collects data, so do nothing in stop
   }
 
   @Override
-  public void drop() throws PipeException {
+  public void dropSubtask() throws PipeException {
     try {
       pipeCollector.close();
     } catch (Exception e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 0194dc5c8a..7441128d95 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,114 +19,53 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
-import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.db.pipe.core.connector.PipeConnectorSubtaskManager;
-import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
+import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public class PipeTaskConnectorStage implements PipeTaskStage {
+public class PipeTaskConnectorStage extends PipeTaskStage {
 
-  protected final PipeConnectorSubtaskExecutor executor;
-  protected final PipeParameters connectorAttributes;
+  protected final PipeParameters pipeConnectorParameters;
 
-  protected PipeStatus status = null;
   protected String connectorSubtaskId = null;
-  protected boolean hasBeenExternallyStopped = false;
 
-  protected PipeTaskConnectorStage(
-      PipeConnectorSubtaskExecutor executor, PipeParameters connectorAttributes) {
-    this.executor = executor;
-    this.connectorAttributes = connectorAttributes;
+  protected PipeTaskConnectorStage(PipeParameters pipeConnectorParameters) {
+    this.pipeConnectorParameters = pipeConnectorParameters;
   }
 
   @Override
-  public synchronized void create() throws PipeException {
-    if (status != null) {
-      if (status == PipeStatus.RUNNING) {
-        throw new PipeException(
-            String.format("The PipeConnectorSubtask %s has been started", connectorSubtaskId));
-      }
-      if (status == PipeStatus.DROPPED) {
-        throw new PipeException(
-            String.format("The PipeConnectorSubtask %s has been dropped", connectorSubtaskId));
-      }
-      // status == PipeStatus.STOPPED
-      if (hasBeenExternallyStopped) {
-        throw new PipeException(
-            String.format(
-                "The PipeConnectorSubtask %s has been externally stopped", connectorSubtaskId));
-      }
-      // otherwise, do nothing to allow retry strategy
-      return;
-    }
-
-    // status == null, register the connector
+  public void createSubtask() throws PipeException {
     connectorSubtaskId =
-        PipeConnectorSubtaskManager.instance().register(executor, connectorAttributes);
-    status = PipeStatus.STOPPED;
+        PipeConnectorSubtaskManager.instance()
+            .register(
+                PipeSubtaskExecutorManager.getInstance().getConnectorSubtaskExecutor(),
+                pipeConnectorParameters);
   }
 
   @Override
-  public synchronized void start() throws PipeException {
-    if (status == null) {
-      throw new PipeException(
-          String.format("The PipeConnectorSubtask %s has not been created", connectorSubtaskId));
-    }
-    if (status == PipeStatus.RUNNING) {
-      // do nothing to allow retry strategy
-      return;
-    }
-    if (status == PipeStatus.DROPPED) {
-      throw new PipeException(
-          String.format("The PipeConnectorSubtask %s has been dropped", connectorSubtaskId));
-    }
-
-    // status == PipeStatus.STOPPED, start the connector
+  public void startSubtask() throws PipeException {
     PipeConnectorSubtaskManager.instance().start(connectorSubtaskId);
-    status = PipeStatus.RUNNING;
   }
 
   @Override
-  public synchronized void stop() throws PipeException {
-    if (status == null) {
-      throw new PipeException(
-          String.format("The PipeConnectorSubtask %s has not been created", connectorSubtaskId));
-    }
-    if (status == PipeStatus.STOPPED) {
-      // do nothing to allow retry strategy
-      return;
-    }
-    if (status == PipeStatus.DROPPED) {
-      throw new PipeException(
-          String.format("The PipeConnectorSubtask %s has been dropped", connectorSubtaskId));
-    }
-
-    // status == PipeStatus.RUNNING, stop the connector
+  public void stopSubtask() throws PipeException {
     PipeConnectorSubtaskManager.instance().stop(connectorSubtaskId);
-    status = PipeStatus.STOPPED;
-    hasBeenExternallyStopped = true;
   }
 
   @Override
-  public synchronized void drop() throws PipeException {
-    if (status == null) {
-      throw new PipeException(
-          String.format("The PipeConnectorSubtask %s has not been created", connectorSubtaskId));
-    }
-    if (status == PipeStatus.DROPPED) {
-      // do nothing to allow retry strategy
-      return;
-    }
-
-    // status == PipeStatus.RUNNING or PipeStatus.STOPPED, drop the connector
+  public void dropSubtask() throws PipeException {
     PipeConnectorSubtaskManager.instance().deregister(connectorSubtaskId);
-    status = PipeStatus.DROPPED;
   }
 
   @Override
   public PipeSubtask getSubtask() {
     return PipeConnectorSubtaskManager.instance().getPipeConnectorSubtask(connectorSubtaskId);
   }
+
+  public PendingQueue getPipeConnectorPendingQueue() {
+    return PipeConnectorSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index e7345be1e1..5c239f1958 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -19,40 +19,97 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
 import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
+import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public class PipeTaskProcessorStage implements PipeTaskStage {
+public class PipeTaskProcessorStage extends PipeTaskStage {
 
-  protected final PipeSubtaskExecutor executor;
-  protected final PipeSubtask subtask;
+  protected final PipeProcessorSubtaskExecutor executor =
+      PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
+
+  protected final PipeProcessorSubtask subtask;
+
+  protected final PendingQueue pipeCollectorInputPendingQueue;
+  protected final PendingQueue pipeConnectorOutputPendingQueue;
 
   protected PipeTaskProcessorStage(
-      PipeProcessorSubtaskExecutor executor, PipeProcessorSubtask subtask) {
-    this.executor = executor;
-    this.subtask = subtask;
+      String pipeName,
+      String dataRegionId,
+      PendingQueue pipeCollectorInputPendingQueue,
+      PipeParameters pipeProcessorParameters,
+      PendingQueue pipeConnectorOutputPendingQueue) {
+    final String taskId = pipeName + "_" + dataRegionId;
+    final PipeProcessor pipeProcessor =
+        PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
+    final PipeEventCollector pipeConnectorOutputEventCollector =
+        new PipeEventCollector(pipeConnectorOutputPendingQueue);
+
+    this.subtask =
+        new PipeProcessorSubtask(
+            taskId,
+            pipeCollectorInputPendingQueue,
+            pipeProcessor,
+            pipeConnectorOutputEventCollector);
+
+    this.pipeCollectorInputPendingQueue =
+        pipeCollectorInputPendingQueue
+            .registerEmptyToNotEmptyListener(
+                taskId,
+                () -> {
+                  if (status == PipeStatus.RUNNING) {
+                    pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
+                    executor.start(subtask.getTaskID());
+                  }
+                })
+            .registerNotEmptyToEmptyListener(taskId, () -> executor.stop(subtask.getTaskID()));
+    this.pipeConnectorOutputPendingQueue =
+        pipeConnectorOutputPendingQueue
+            .registerNotFullToFullListener(taskId, () -> executor.stop(subtask.getTaskID()))
+            .registerFullToNotFullListener(
+                taskId,
+                () -> {
+                  // only start when the pipe is running
+                  if (status == PipeStatus.RUNNING) {
+                    pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
+                    executor.start(subtask.getTaskID());
+                  }
+                });
   }
 
   @Override
-  public void create() throws PipeException {
+  public void createSubtask() throws PipeException {
     executor.register(subtask);
   }
 
   @Override
-  public void start() throws PipeException {
+  public void startSubtask() throws PipeException {
     executor.start(subtask.getTaskID());
   }
 
   @Override
-  public void stop() throws PipeException {
+  public void stopSubtask() throws PipeException {
     executor.stop(subtask.getTaskID());
   }
 
   @Override
-  public void drop() throws PipeException {
+  public void dropSubtask() throws PipeException {
+    final String taskId = subtask.getTaskID();
+
+    pipeCollectorInputPendingQueue.removeEmptyToNotEmptyListener(taskId);
+    pipeCollectorInputPendingQueue.removeNotEmptyToEmptyListener(taskId);
+
+    pipeConnectorOutputPendingQueue.removeNotFullToFullListener(taskId);
+    pipeConnectorOutputPendingQueue.removeFullToNotFullListener(taskId);
+
     executor.deregister(subtask.getTaskID());
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
index 5f57fbabcb..1a802f1a15 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -19,42 +19,121 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public interface PipeTaskStage {
+public abstract class PipeTaskStage {
+
+  protected PipeStatus status = null;
+  protected boolean hasBeenExternallyStopped = false;
 
   /**
    * Create a pipe task stage.
    *
    * @throws PipeException if failed to create a pipe task stage.
    */
-  void create() throws PipeException;
+  public synchronized void create() {
+    if (status != null) {
+      if (status == PipeStatus.RUNNING) {
+        throw new PipeException("The PipeTaskStage has been started");
+      }
+      if (status == PipeStatus.DROPPED) {
+        throw new PipeException("The PipeTaskStage has been dropped");
+      }
+      // status == PipeStatus.STOPPED
+      if (hasBeenExternallyStopped) {
+        throw new PipeException("The PipeTaskStage has been externally stopped");
+      }
+      // otherwise, do nothing to allow retry strategy
+      return;
+    }
+
+    // status == null, register the subtask
+    createSubtask();
+
+    status = PipeStatus.STOPPED;
+  }
+
+  protected abstract void createSubtask() throws PipeException;
+
   /**
    * Start a pipe task stage.
    *
    * @throws PipeException if failed to start a pipe task stage.
    */
-  void start() throws PipeException;
+  public synchronized void start() {
+    if (status == null) {
+      throw new PipeException("The PipeTaskStage has not been created");
+    }
+    if (status == PipeStatus.RUNNING) {
+      // do nothing to allow retry strategy
+      return;
+    }
+    if (status == PipeStatus.DROPPED) {
+      throw new PipeException("The PipeTaskStage has been dropped");
+    }
+
+    // status == PipeStatus.STOPPED, start the subtask
+    startSubtask();
+
+    status = PipeStatus.RUNNING;
+  }
+
+  protected abstract void startSubtask() throws PipeException;
 
   /**
    * Stop a pipe task stage.
    *
    * @throws PipeException if failed to stop a pipe task stage.
    */
-  void stop() throws PipeException;
+  public synchronized void stop() {
+    if (status == null) {
+      throw new PipeException("The PipeTaskStage has not been created");
+    }
+    if (status == PipeStatus.STOPPED) {
+      // do nothing to allow retry strategy
+      return;
+    }
+    if (status == PipeStatus.DROPPED) {
+      throw new PipeException("The PipeTaskStage has been dropped");
+    }
+
+    // status == PipeStatus.RUNNING, stop the connector
+    stopSubtask();
+
+    status = PipeStatus.STOPPED;
+    hasBeenExternallyStopped = true;
+  }
+
+  protected abstract void stopSubtask() throws PipeException;
 
   /**
    * Drop a pipe task stage.
    *
    * @throws PipeException if failed to drop a pipe task stage.
    */
-  void drop() throws PipeException;
+  public synchronized void drop() {
+    if (status == null) {
+      throw new PipeException("The PipeTaskStage has not been created");
+    }
+    if (status == PipeStatus.DROPPED) {
+      // do nothing to allow retry strategy
+      return;
+    }
+
+    // status == PipeStatus.RUNNING or PipeStatus.STOPPED, drop the connector
+    dropSubtask();
+
+    status = PipeStatus.DROPPED;
+  }
+
+  protected abstract void dropSubtask() throws PipeException;
 
   /**
    * Get the pipe subtask.
    *
    * @return the pipe subtask.
    */
-  PipeSubtask getSubtask();
+  public abstract PipeSubtask getSubtask();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index c07f83632c..48aa466bd8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
@@ -32,53 +33,44 @@ import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ArrayBlockingQueue;
-
 public class PipeConnectorSubtask extends PipeSubtask {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtask.class);
 
-  // input
-  private final ArrayBlockingQueue<Event> pendingQueue;
-  // output
-  private final PipeConnector pipeConnector;
+  private final PendingQueue inputPendingQueue;
+  private final PipeConnector outputPipeConnector;
 
   /** @param taskID connectorAttributeSortedString */
-  public PipeConnectorSubtask(String taskID, PipeConnector pipeConnector) {
+  public PipeConnectorSubtask(
+      String taskID, PendingQueue inputPendingQueue, PipeConnector outputPipeConnector) {
     super(taskID);
-    // TODO: make the size of the queue size reasonable and configurable
-    this.pendingQueue = new ArrayBlockingQueue<>(1024 * 1024);
-    this.pipeConnector = pipeConnector;
-  }
-
-  public ArrayBlockingQueue<Event> getInputPendingQueue() {
-    return pendingQueue;
+    this.inputPendingQueue = inputPendingQueue;
+    this.outputPipeConnector = outputPipeConnector;
   }
 
   // TODO: for a while
   @Override
   protected void executeForAWhile() {
-    if (pendingQueue.isEmpty()) {
-      return;
-    }
-
     try {
       // TODO: reduce the frequency of heartbeat
-      pipeConnector.heartbeat();
+      outputPipeConnector.heartbeat();
     } catch (Exception e) {
       throw new PipeConnectionException(
           "PipeConnector: failed to connect to the target system.", e);
     }
 
-    final Event event = pendingQueue.poll();
+    final Event event = inputPendingQueue.poll();
+    if (event == null) {
+      return;
+    }
 
     try {
       if (event instanceof TabletInsertionEvent) {
-        pipeConnector.transfer((TabletInsertionEvent) event);
+        outputPipeConnector.transfer((TabletInsertionEvent) event);
       } else if (event instanceof TsFileInsertionEvent) {
-        pipeConnector.transfer((TsFileInsertionEvent) event);
+        outputPipeConnector.transfer((TsFileInsertionEvent) event);
       } else if (event instanceof DeletionEvent) {
-        pipeConnector.transfer((DeletionEvent) event);
+        outputPipeConnector.transfer((DeletionEvent) event);
       } else {
         throw new RuntimeException("Unsupported event type: " + event.getClass().getName());
       }
@@ -97,7 +89,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
       int retry = 0;
       while (retry < MAX_RETRY_TIMES) {
         try {
-          pipeConnector.handshake();
+          outputPipeConnector.handshake();
           break;
         } catch (Exception e) {
           retry++;
@@ -131,7 +123,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
   @Override
   public void close() {
     try {
-      pipeConnector.close();
+      outputPipeConnector.close();
     } catch (Exception e) {
       e.printStackTrace();
       LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 34f8045cb6..f863962e96 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
+import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -30,35 +31,32 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ArrayBlockingQueue;
-
 public class PipeProcessorSubtask extends PipeSubtask {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeProcessorSubtask.class);
 
-  private final ArrayBlockingQueue<Event> pendingEventQueue;
+  private final PendingQueue inputPendingQueue;
   private final PipeProcessor pipeProcessor;
   private final EventCollector outputEventCollector;
 
   public PipeProcessorSubtask(
       String taskID,
-      ArrayBlockingQueue<Event> pendingEventQueue,
+      PendingQueue inputPendingQueue,
       PipeProcessor pipeProcessor,
       EventCollector outputEventCollector) {
     super(taskID);
+    this.inputPendingQueue = inputPendingQueue;
     this.pipeProcessor = pipeProcessor;
-    this.pendingEventQueue = pendingEventQueue;
     this.outputEventCollector = outputEventCollector;
   }
 
   @Override
   protected void executeForAWhile() {
-    if (pendingEventQueue.isEmpty()) {
+    final Event event = inputPendingQueue.poll();
+    if (event == null) {
       return;
     }
 
-    final Event event = pendingEventQueue.poll();
-
     try {
       if (event instanceof TabletInsertionEvent) {
         pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
index 6fbc7fe2c6..517cf899a4 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
+import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 
@@ -36,7 +37,9 @@ public class PipeConnectorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
     subtask =
         Mockito.spy(
             new PipeConnectorSubtask(
-                "PipeConnectorSubtaskExecutorTest", mock(PipeConnector.class)) {
+                "PipeConnectorSubtaskExecutorTest",
+                mock(PendingQueue.class),
+                mock(PipeConnector.class)) {
               @Override
               public void executeForAWhile() {}
             });
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
index 1abf2ab05e..e6ded5976c 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
+import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -26,8 +27,6 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.junit.Before;
 import org.mockito.Mockito;
 
-import java.util.concurrent.ArrayBlockingQueue;
-
 import static org.mockito.Mockito.mock;
 
 public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
@@ -40,7 +39,7 @@ public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
         Mockito.spy(
             new PipeProcessorSubtask(
                 "PipeProcessorSubtaskExecutorTest",
-                mock(ArrayBlockingQueue.class),
+                mock(PendingQueue.class),
                 mock(PipeProcessor.class),
                 mock(EventCollector.class)) {
               @Override


[iotdb] 07/08: Merge branch 'master' of github.com:apache/iotdb into IOTDB-5848

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5848
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0420bc0deed602822e6acfb5dab95b286b0fb2eb
Merge: 240fe85737 5c20ee49d3
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue May 9 22:15:51 2023 +0800

    Merge branch 'master' of github.com:apache/iotdb into IOTDB-5848

 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  22 ++
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   4 +
 client-py/tests/test_dataframe.py                  |   2 +
 .../confignode/manager/ClusterSchemaManager.java   |  13 +-
 docs/UserGuide/QuickStart/WayToGetIoTDB.md         |   2 +-
 .../apache/iotdb/db/it/query/IoTDBResultSetIT.java |   2 +
 .../db/it/schema/IoTDBAutoCreateSchemaIT.java      |  17 +-
 .../it/schema/IoTDBCreateAlignedTimeseriesIT.java  |  17 +-
 .../db/it/schema/IoTDBCreateStorageGroupIT.java    |  18 +-
 .../db/it/schema/IoTDBCreateTimeseriesIT.java      |  17 +-
 .../db/it/schema/IoTDBDeactivateTemplateIT.java    |  26 +-
 .../it/schema/IoTDBDeleteAlignedTimeseriesIT.java  |  18 +-
 .../db/it/schema/IoTDBDeleteStorageGroupIT.java    |  17 +-
 .../db/it/schema/IoTDBDeleteTimeseriesIT.java      |  18 +-
 .../iotdb/db/it/schema/IoTDBExtendTemplateIT.java  |  49 +--
 .../iotdb/db/it/schema/IoTDBMetadataFetchIT.java   |  63 ++--
 .../iotdb/db/it/schema/IoTDBSchemaTemplateIT.java  |  49 +--
 .../db/it/schema/IoTDBSortedShowTimeseriesIT.java  |  20 +-
 .../apache/iotdb/db/it/schema/IoTDBTagAlterIT.java |  17 +-
 .../org/apache/iotdb/db/it/schema/IoTDBTagIT.java  |  17 +-
 .../org/apache/iotdb/util/AbstractSchemaIT.java    |  48 ++-
 .../iotdb/zeppelin/it/IoTDBInterpreterIT.java      |  14 +-
 .../org/apache/iotdb/commons/path/PartialPath.java |   4 +-
 .../node/common/AbstractMeasurementMNode.java      |   4 +
 .../schema/node/role/IMeasurementMNode.java        |   2 +
 .../commons/schema/node/utils/IMNodeFactory.java   |   4 +
 .../schemaregion/rocksdb/RSchemaRegion.java        |   7 +
 .../rocksdb/mnode/RMeasurementMNode.java           |   5 +
 .../metadata/tagSchemaRegion/TagSchemaRegion.java  |   7 +
 .../CompactionMemoryNotEnoughException.java}       |  20 +-
 .../execute/task/CrossSpaceCompactionTask.java     |   6 +-
 .../apache/iotdb/db/metadata/MetadataConstant.java |   6 +
 .../mnode/config/factory/ConfigMNodeFactory.java   |   7 +
 .../mnode/mem/factory/MemMNodeFactory.java         |  14 +
 ...MeasurementMNode.java => LogicalViewMNode.java} |  30 +-
 .../metadata/mnode/mem/impl/LogicalViewSchema.java | 215 +++++++++++
 .../metadata/mnode/mem/impl/MeasurementMNode.java  |   5 +
 .../metadata/mnode/mem/info/LogicalViewInfo.java   | 161 +++++++++
 .../schemafile/factory/CacheMNodeFactory.java      |   7 +
 .../schemafile/impl/CachedMeasurementMNode.java    |   5 +
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  |  10 +-
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |  75 +++-
 .../mtree/snapshot/MemMTreeSnapshotUtil.java       |  41 ++-
 .../plan/schemaregion/SchemaRegionPlanType.java    |   2 +
 .../plan/schemaregion/SchemaRegionPlanVisitor.java |   5 +
 .../impl/SchemaRegionPlanDeserializer.java         |  24 ++
 .../impl/SchemaRegionPlanSerializer.java           |  27 ++
 .../impl/SchemaRegionPlanTxtSerializer.java        |  21 ++
 .../impl/write/CreateLogicalViewPlanImpl.java      |  75 ++++
 .../schemaregion/result/ShowTimeSeriesResult.java  |  14 +-
 .../schemaregion/write/ICreateLogicalViewPlan.java |  66 ++++
 .../metadata/query/info/ITimeSeriesSchemaInfo.java |   6 +-
 .../metadata/rescon/MemSchemaRegionStatistics.java |   8 +-
 .../db/metadata/schemaregion/ISchemaRegion.java    |   5 +
 .../schemaregion/SchemaRegionMemoryImpl.java       |  40 +++
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |   7 +
 .../metadata/template/ClusterTemplateManager.java  |  36 ++
 .../view/viewExpression/ViewExpression.java        | 318 +++++++++++++++++
 .../view/viewExpression/ViewExpressionType.java    |  76 ++++
 .../binary/BinaryViewExpression.java               | 119 +++++++
 .../binary/arithmetic/AdditionViewExpression.java  |  61 ++++
 .../arithmetic/ArithmeticBinaryViewExpression.java |  52 +++
 .../binary/arithmetic/DivisionViewExpression.java  |  61 ++++
 .../binary/arithmetic/ModuloViewExpression.java    |  61 ++++
 .../arithmetic/MultiplicationViewExpression.java   |  63 ++++
 .../arithmetic/SubtractionViewExpression.java      |  61 ++++
 .../compare/CompareBinaryViewExpression.java       |  52 +++
 .../binary/compare/EqualToViewExpression.java      |  57 +++
 .../binary/compare/GreaterEqualViewExpression.java |  57 +++
 .../binary/compare/GreaterThanViewExpression.java  |  57 +++
 .../binary/compare/LessEqualViewExpression.java    |  57 +++
 .../binary/compare/LessThanViewExpression.java     |  57 +++
 .../binary/compare/NonEqualViewExpression.java     |  57 +++
 .../binary/logic/LogicAndViewExpression.java       |  57 +++
 .../binary/logic/LogicBinaryViewExpression.java    |  52 +++
 .../binary/logic/LogicOrViewExpression.java        |  57 +++
 .../viewExpression/leaf/ConstantViewOperand.java   |  98 ++++++
 .../viewExpression/leaf/LeafViewOperand.java}      |  28 +-
 .../view/viewExpression/leaf/NullViewOperand.java  |  61 ++++
 .../viewExpression/leaf/TimeSeriesViewOperand.java |  87 +++++
 .../viewExpression/leaf/TimestampViewOperand.java  |  72 ++++
 .../multi/FunctionViewExpression.java              | 198 +++++++++++
 .../ternary/BetweenViewExpression.java             | 111 ++++++
 .../ternary/TernaryViewExpression.java             | 105 ++++++
 .../viewExpression/unary/InViewExpression.java     | 106 ++++++
 .../viewExpression/unary/IsNullViewExpression.java |  89 +++++
 .../viewExpression/unary/LikeViewExpression.java   | 163 +++++++++
 .../unary/LogicNotViewExpression.java              |  61 ++++
 .../unary/NegationViewExpression.java              |  61 ++++
 .../unary/RegularViewExpression.java               | 105 ++++++
 .../viewExpression/unary/UnaryViewExpression.java  |  73 ++++
 .../visitor/GetSourcePathsVisitor.java             |  95 +++++
 .../visitor/TransformToExpressionVisitor.java      | 320 +++++++++++++++++
 .../visitor/ViewExpressionVisitor.java             | 219 ++++++++++++
 .../metadata/visitor/SchemaExecutionVisitor.java   |  22 ++
 .../db/mpp/common/header/ColumnHeaderConstant.java |   6 +-
 .../execution/executor/RegionWriteExecutor.java    |  49 +++
 .../schema/source/TimeSeriesSchemaSource.java      |  14 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  45 +++
 .../visitor/TransformToViewExpressionVisitor.java  | 391 +++++++++++++++++++++
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  86 +++++
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  21 ++
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   6 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   5 +
 .../node/metedata/write/CreateLogicalViewNode.java | 250 +++++++++++++
 .../iotdb/db/mpp/plan/statement/StatementType.java |   2 +
 .../db/mpp/plan/statement/StatementVisitor.java    |   7 +
 .../metadata/CreateLogicalViewStatement.java       | 246 +++++++++++++
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |  20 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   1 +
 .../metadata/view/ViewExpressionToStringTest.java  | 183 ++++++++++
 111 files changed, 6057 insertions(+), 222 deletions(-)


[iotdb] 01/08: Merge branch 'master' of github.com:apache/iotdb into IOTDB-5848

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5848
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 500e2b854cb7c0cfe70413001ae7e77f211d78ad
Merge: ad0d1fae91 e39060e1d1
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 8 21:23:50 2023 +0800

    Merge branch 'master' of github.com:apache/iotdb into IOTDB-5848

 consensus/pom.xml                                  |  2 +-
 .../iotdb/consensus/config/IoTConsensusConfig.java | 36 ++++----
 .../consensus/iot/logdispatcher/LogDispatcher.java |  8 +-
 .../logdispatcher/LogDispatcherThreadMetrics.java  | 22 +++++
 .../consensus/iot/logdispatcher/SyncStatus.java    |  2 -
 docs/UserGuide/API/Programming-Java-Native-API.md  | 98 ----------------------
 docs/UserGuide/Reference/Common-Config-Manual.md   | 29 ++++---
 docs/UserGuide/Reference/DataNode-Config-Manual.md |  4 +-
 .../UserGuide/API/Programming-Java-Native-API.md   | 93 --------------------
 .../zh/UserGuide/Reference/Common-Config-Manual.md | 29 ++++---
 .../UserGuide/Reference/DataNode-Config-Manual.md  |  4 +-
 .../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java  | 30 +++++++
 .../resources/conf/iotdb-common.properties         | 15 +++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 40 +++++----
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 26 +++---
 .../operator/process/DeviceViewIntoOperator.java   |  6 +-
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  |  9 +-
 17 files changed, 175 insertions(+), 278 deletions(-)


[iotdb] 08/08: ListenablePendingQueue

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5848
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2f29861d7a90f8501682201ee19dc3e82bf0c429
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 10 00:51:46 2023 +0800

    ListenablePendingQueue
---
 .../core/collector/IoTDBDataRegionCollector.java   |  4 +--
 .../PipeRealtimeDataRegionHybridCollector.java     | 13 +++++-----
 .../connector/PipeConnectorSubtaskLifeCycle.java   | 30 +++++++++++-----------
 .../connector/PipeConnectorSubtaskManager.java     |  8 +++---
 .../event/view/collector/PipeEventCollector.java   |  6 ++---
 .../pipe/task/{binder => queue}/EventSupplier.java |  2 +-
 .../ListenableBlockingPendingQueue.java}           | 13 +++++++---
 .../ListenablePendingQueue.java}                   | 22 ++++++----------
 .../ListenableUnblockingPendingQueue.java}         | 13 +++++++---
 .../PendingQueueEmptyToNotEmptyListener.java       |  2 +-
 .../PendingQueueFullToNotFullListener.java         |  2 +-
 .../PendingQueueNotEmptyToEmptyListener.java       |  2 +-
 .../PendingQueueNotFullToFullListener.java         |  2 +-
 .../db/pipe/task/stage/PipeTaskCollectorStage.java | 10 +++-----
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  4 +--
 .../db/pipe/task/stage/PipeTaskProcessorStage.java | 13 +++++-----
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |  8 +++---
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |  2 +-
 .../core/collector/PipeRealtimeCollectTest.java    |  9 ++++---
 .../executor/PipeConnectorSubtaskExecutorTest.java |  4 +--
 .../executor/PipeProcessorSubtaskExecutorTest.java |  2 +-
 21 files changed, 90 insertions(+), 81 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index d5638d67a5..d7526fa187 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.core.collector;
 import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -39,7 +39,7 @@ public class IoTDBDataRegionCollector implements PipeCollector {
   // TODO: support pattern in historical collector
   private final PipeHistoricalDataRegionTsFileCollector historicalCollector;
 
-  public IoTDBDataRegionCollector(PendingQueue<Event> collectorPendingQueue) {
+  public IoTDBDataRegionCollector(ListenableUnblockingPendingQueue<Event> collectorPendingQueue) {
     hasBeenStarted = new AtomicBoolean(false);
     realtimeCollector = new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
     historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 9115d91ef8..d8b79fbfcf 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.core.collector.realtime;
 import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import org.slf4j.Logger;
@@ -37,9 +37,10 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
   // TODO: memory control
   // This queue is used to store pending events collected by the method collect(). The method
   // supply() will poll events from this queue and send them to the next pipe plugin.
-  private final PendingQueue<Event> pendingQueue;
+  private final ListenableUnblockingPendingQueue<Event> pendingQueue;
 
-  public PipeRealtimeDataRegionHybridCollector(PendingQueue<Event> pendingQueue) {
+  public PipeRealtimeDataRegionHybridCollector(
+      ListenableUnblockingPendingQueue<Event> pendingQueue) {
     this.pendingQueue = pendingQueue;
   }
 
@@ -87,9 +88,9 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
           String.format(
               "Pending Queue of Hybrid Realtime Collector %s has reached capacity, discard TsFile Event %s, current state %s",
               this, event, event.getTsFileEpoch().getState(this)));
-      // TODO: more degradation strategies
-      // TODO: dynamic control of the pending queue capacity
-      // TODO: should be handled by the PipeRuntimeAgent
+      // this would not happen, but just in case.
+      // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+      // TODO: memory control when elements in queue are too many.
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
index f794a7a7a7..9ed53ae310 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.core.connector;
 
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -28,7 +28,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
   private final PipeConnectorSubtaskExecutor executor;
   private final PipeConnectorSubtask subtask;
-  private final PendingQueue<Event> pendingQueue;
+  private final ListenableBlockingPendingQueue<Event> pendingQueue;
 
   private int runningTaskCount;
   private int aliveTaskCount;
@@ -36,20 +36,20 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
   public PipeConnectorSubtaskLifeCycle(
       PipeConnectorSubtaskExecutor executor,
       PipeConnectorSubtask subtask,
-      PendingQueue<Event> pendingQueue) {
+      ListenableBlockingPendingQueue<Event> pendingQueue) {
     this.executor = executor;
     this.subtask = subtask;
-    this.pendingQueue =
-        pendingQueue
-            .registerEmptyToNotEmptyListener(
-                subtask.getTaskID(),
-                () -> {
-                  if (hasRunningTasks()) {
-                    executor.start(subtask.getTaskID());
-                  }
-                })
-            .registerNotEmptyToEmptyListener(
-                subtask.getTaskID(), () -> executor.stop(subtask.getTaskID()));
+    this.pendingQueue = pendingQueue;
+
+    pendingQueue.registerEmptyToNotEmptyListener(
+        subtask.getTaskID(),
+        () -> {
+          if (hasRunningTasks()) {
+            executor.start(subtask.getTaskID());
+          }
+        });
+    this.pendingQueue.registerNotEmptyToEmptyListener(
+        subtask.getTaskID(), () -> executor.stop(subtask.getTaskID()));
 
     runningTaskCount = 0;
     aliveTaskCount = 0;
@@ -59,7 +59,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
     return subtask;
   }
 
-  public PendingQueue<Event> getPendingQueue() {
+  public ListenableBlockingPendingQueue<Event> getPendingQueue() {
     return pendingQueue;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
index feb2797022..070e4e05b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.connector;
 
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -45,7 +45,8 @@ public class PipeConnectorSubtaskManager {
     if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
       final PipeConnector pipeConnector = PipeAgent.plugin().reflectConnector(connectorAttributes);
       // TODO: make pendingQueue size configurable
-      final PendingQueue<Event> pendingQueue = new PendingQueue<>(1024 * 1024);
+      final ListenableBlockingPendingQueue<Event> pendingQueue =
+          new ListenableBlockingPendingQueue<>(65535);
       final PipeConnectorSubtask pipeConnectorSubtask =
           new PipeConnectorSubtask(attributeSortedString, pendingQueue, pipeConnector);
       final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
@@ -97,7 +98,8 @@ public class PipeConnectorSubtaskManager {
     return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask();
   }
 
-  public PendingQueue<Event> getPipeConnectorPendingQueue(String attributeSortedString) {
+  public ListenableBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+      String attributeSortedString) {
     if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
       throw new PipeException(
           "Failed to get PendingQueue. No such subtask: " + attributeSortedString);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 01289395b8..0d1d60fdde 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.event.view.collector;
 
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
@@ -31,7 +31,7 @@ import java.util.Queue;
 
 public class PipeEventCollector implements EventCollector {
 
-  private final PendingQueue<Event> pendingQueue;
+  private final ListenableBlockingPendingQueue<Event> pendingQueue;
 
   // buffer queue is used to store events that are not offered to pending queue
   // because the pending queue is full. when pending queue is full, pending queue
@@ -41,7 +41,7 @@ public class PipeEventCollector implements EventCollector {
   // events before events in buffer queue are offered to pending queue.
   private final Queue<Event> bufferQueue;
 
-  public PipeEventCollector(PendingQueue<Event> pendingQueue) {
+  public PipeEventCollector(ListenableBlockingPendingQueue<Event> pendingQueue) {
     this.pendingQueue = pendingQueue;
     bufferQueue = new LinkedList<>();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/EventSupplier.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/EventSupplier.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/EventSupplier.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/EventSupplier.java
index 01abb027d2..ea056dc22a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/EventSupplier.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/EventSupplier.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
 import org.apache.iotdb.pipe.api.event.Event;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
similarity index 69%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
index fe84f3ca4b..ab3090a0ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
@@ -17,10 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
-@FunctionalInterface
-public interface PendingQueueNotFullToFullListener {
+import org.apache.iotdb.pipe.api.event.Event;
 
-  void onPendingQueueNotFullToFull();
+import org.eclipse.jetty.util.BlockingArrayQueue;
+
+public class ListenableBlockingPendingQueue<E extends Event> extends ListenablePendingQueue<E> {
+
+  public ListenableBlockingPendingQueue(int pendingQueueSize) {
+    super(new BlockingArrayQueue<>(pendingQueueSize));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueue.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java
index 1e123b7390..f476d88053 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java
@@ -17,17 +17,16 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
 import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class PendingQueue<E extends Event> {
+public abstract class ListenablePendingQueue<E extends Event> {
 
   private final Queue<E> pendingQueue;
 
@@ -42,12 +41,11 @@ public class PendingQueue<E extends Event> {
 
   private final AtomicBoolean isFull = new AtomicBoolean(false);
 
-  public PendingQueue(int pendingQueueSize) {
-    // TODO: make the size of the queue size reasonable and configurable
-    this.pendingQueue = new ArrayBlockingQueue<>(pendingQueueSize);
+  protected ListenablePendingQueue(Queue<E> pendingQueue) {
+    this.pendingQueue = pendingQueue;
   }
 
-  public PendingQueue<E> registerEmptyToNotEmptyListener(
+  public ListenablePendingQueue<E> registerEmptyToNotEmptyListener(
       String id, PendingQueueEmptyToNotEmptyListener listener) {
     emptyToNotEmptyListeners.put(id, listener);
     return this;
@@ -63,7 +61,7 @@ public class PendingQueue<E extends Event> {
         .forEach(PendingQueueEmptyToNotEmptyListener::onPendingQueueEmptyToNotEmpty);
   }
 
-  public PendingQueue<E> registerNotEmptyToEmptyListener(
+  public ListenablePendingQueue<E> registerNotEmptyToEmptyListener(
       String id, PendingQueueNotEmptyToEmptyListener listener) {
     notEmptyToEmptyListeners.put(id, listener);
     return this;
@@ -79,7 +77,7 @@ public class PendingQueue<E extends Event> {
         .forEach(PendingQueueNotEmptyToEmptyListener::onPendingQueueNotEmptyToEmpty);
   }
 
-  public PendingQueue<E> registerFullToNotFullListener(
+  public ListenablePendingQueue<E> registerFullToNotFullListener(
       String id, PendingQueueFullToNotFullListener listener) {
     fullToNotFullListeners.put(id, listener);
     return this;
@@ -95,7 +93,7 @@ public class PendingQueue<E extends Event> {
         .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
   }
 
-  public PendingQueue<E> registerNotFullToFullListener(
+  public ListenablePendingQueue<E> registerNotFullToFullListener(
       String id, PendingQueueNotFullToFullListener listener) {
     notFullToFullListeners.put(id, listener);
     return this;
@@ -158,8 +156,4 @@ public class PendingQueue<E extends Event> {
   public int size() {
     return pendingQueue.size();
   }
-
-  public void disable() {
-    pendingQueue = null;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
similarity index 70%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
index fe84f3ca4b..c2772b4eeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
@@ -17,10 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
-@FunctionalInterface
-public interface PendingQueueNotFullToFullListener {
+import org.apache.iotdb.pipe.api.event.Event;
 
-  void onPendingQueueNotFullToFull();
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ListenableUnblockingPendingQueue<E extends Event> extends ListenablePendingQueue<E> {
+
+  public ListenableUnblockingPendingQueue() {
+    super(new ConcurrentLinkedQueue<>());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueEmptyToNotEmptyListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueEmptyToNotEmptyListener.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
index 6d855cdb46..d56b6e789e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueEmptyToNotEmptyListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
 @FunctionalInterface
 public interface PendingQueueEmptyToNotEmptyListener {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueFullToNotFullListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueFullToNotFullListener.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
index 91b6192c2a..33225505f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueFullToNotFullListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
 @FunctionalInterface
 public interface PendingQueueFullToNotFullListener {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotEmptyToEmptyListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotEmptyToEmptyListener.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
index 80b89045bf..4225783739 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotEmptyToEmptyListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
 @FunctionalInterface
 public interface PendingQueueNotEmptyToEmptyListener {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
index fe84f3ca4b..2433cd4b8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/binder/PendingQueueNotFullToFullListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.binder;
+package org.apache.iotdb.db.pipe.task.queue;
 
 @FunctionalInterface
 public interface PendingQueueNotFullToFullListener {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 770614cb6f..161890af40 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -22,9 +22,8 @@ package org.apache.iotdb.db.pipe.task.stage;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -45,7 +44,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
    * processing, and it also can notify the PipeTaskProcessorStage to start processing data when the
    * queue is not empty.
    */
-  private PendingQueue<Event> collectorPendingQueue;
+  private ListenableUnblockingPendingQueue<Event> collectorPendingQueue;
 
   private PipeCollector pipeCollector;
 
@@ -63,8 +62,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
             PipeCollectorConstant.COLLECTOR_KEY,
             BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())
         .equals(BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())) {
-      collectorPendingQueue =
-          new PendingQueue<>(PipeConfig.getInstance().getRealtimeCollectorPendingQueueCapacity());
+      collectorPendingQueue = new ListenableUnblockingPendingQueue<>();
       this.pipeCollector = new IoTDBDataRegionCollector(collectorPendingQueue);
     } else {
       this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters);
@@ -94,7 +92,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
     }
   }
 
-  public PendingQueue<Event> getCollectorPendingQueue() {
+  public ListenableUnblockingPendingQueue<Event> getCollectorPendingQueue() {
     return collectorPendingQueue;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index e385b1266f..560ee2ae86 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.task.stage;
 
 import org.apache.iotdb.db.pipe.core.connector.PipeConnectorSubtaskManager;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -60,7 +60,7 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
     PipeConnectorSubtaskManager.instance().deregister(connectorSubtaskId);
   }
 
-  public PendingQueue<Event> getPipeConnectorPendingQueue() {
+  public ListenableBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
     return PipeConnectorSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 8aa112d04d..503e97c6e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -24,8 +24,9 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
 import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
-import org.apache.iotdb.db.pipe.task.binder.EventSupplier;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenablePendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -41,8 +42,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
 
   protected final PipeProcessorSubtask subtask;
 
-  protected final PendingQueue<Event> pipeCollectorInputPendingQueue;
-  protected final PendingQueue<Event> pipeConnectorOutputPendingQueue;
+  protected final ListenablePendingQueue<Event> pipeCollectorInputPendingQueue;
+  protected final ListenablePendingQueue<Event> pipeConnectorOutputPendingQueue;
 
   /**
    * @param pipeName pipe name
@@ -57,9 +58,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       String pipeName,
       String dataRegionId,
       EventSupplier pipeCollectorInputEventSupplier,
-      @Nullable PendingQueue<Event> pipeCollectorInputPendingQueue,
+      @Nullable ListenablePendingQueue<Event> pipeCollectorInputPendingQueue,
       PipeParameters pipeProcessorParameters,
-      PendingQueue<Event> pipeConnectorOutputPendingQueue) {
+      ListenableBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
     final String taskId = pipeName + "_" + dataRegionId;
     final PipeProcessor pipeProcessor =
         PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 48aa466bd8..5de09bdd8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
@@ -37,12 +37,14 @@ public class PipeConnectorSubtask extends PipeSubtask {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtask.class);
 
-  private final PendingQueue inputPendingQueue;
+  private final ListenableBlockingPendingQueue<Event> inputPendingQueue;
   private final PipeConnector outputPipeConnector;
 
   /** @param taskID connectorAttributeSortedString */
   public PipeConnectorSubtask(
-      String taskID, PendingQueue inputPendingQueue, PipeConnector outputPipeConnector) {
+      String taskID,
+      ListenableBlockingPendingQueue<Event> inputPendingQueue,
+      PipeConnector outputPipeConnector) {
     super(taskID);
     this.inputPendingQueue = inputPendingQueue;
     this.outputPipeConnector = outputPipeConnector;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index cc94de2bb7..3b7a59aa9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
-import org.apache.iotdb.db.pipe.task.binder.EventSupplier;
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 46ce856b82..214b8b0774 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.EventType;
@@ -83,13 +84,13 @@ public class PipeRealtimeCollectTest {
     // set up realtime collector
 
     try (PipeRealtimeDataRegionHybridCollector collector1 =
-            new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
+            new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector2 =
-            new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
+            new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector3 =
-            new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
+            new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector4 =
-            new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue)) {
+            new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>())) {
 
       collector1.customize(
           new PipeParameters(
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
index 517cf899a4..11007ebf3e 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.apache.iotdb.db.pipe.task.binder.PendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 
@@ -38,7 +38,7 @@ public class PipeConnectorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
         Mockito.spy(
             new PipeConnectorSubtask(
                 "PipeConnectorSubtaskExecutorTest",
-                mock(PendingQueue.class),
+                mock(ListenableBlockingPendingQueue.class),
                 mock(PipeConnector.class)) {
               @Override
               public void executeForAWhile() {}
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
index 88997a0ca7..027405d872 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.apache.iotdb.db.pipe.task.binder.EventSupplier;
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;


[iotdb] 06/08: dispatch realtime events when processing

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5848
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 240fe85737113debdc32bc514da7607c5a3b8ddd
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue May 9 22:15:40 2023 +0800

    dispatch realtime events when processing
---
 .../db/pipe/core/event/view/collector/PipeEventCollector.java     | 4 ++--
 .../apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java   | 8 +++++++-
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index a9d9232150..01289395b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -31,7 +31,7 @@ import java.util.Queue;
 
 public class PipeEventCollector implements EventCollector {
 
-  private final PendingQueue pendingQueue;
+  private final PendingQueue<Event> pendingQueue;
 
   // buffer queue is used to store events that are not offered to pending queue
   // because the pending queue is full. when pending queue is full, pending queue
@@ -41,7 +41,7 @@ public class PipeEventCollector implements EventCollector {
   // events before events in buffer queue are offered to pending queue.
   private final Queue<Event> bufferQueue;
 
-  public PipeEventCollector(PendingQueue pendingQueue) {
+  public PipeEventCollector(PendingQueue<Event> pendingQueue) {
     this.pendingQueue = pendingQueue;
     bufferQueue = new LinkedList<>();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 96cd6a70f3..cc94de2bb7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.task.binder.EventSupplier;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -52,11 +53,16 @@ public class PipeProcessorSubtask extends PipeSubtask {
 
   @Override
   protected void executeForAWhile() throws Exception {
-    final Event event = inputEventSupplier.supply();
+    Event event = inputEventSupplier.supply();
     if (event == null) {
       return;
     }
 
+    if (event instanceof PipeRealtimeCollectEvent) {
+      // dispatch the event
+      event = ((PipeRealtimeCollectEvent) event).getEvent();
+    }
+
     try {
       if (event instanceof TabletInsertionEvent) {
         pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);