You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/10 04:46:10 UTC

[iotdb] branch master updated: [IOTDB-5848] Pipe: task construction and life cycle management (#9808)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 15a3adc2eb [IOTDB-5848] Pipe: task construction and life cycle management (#9808)
15a3adc2eb is described below

commit 15a3adc2eb2ea6c341a192b5cc75866d35aca75b
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed May 10 12:46:01 2023 +0800

    [IOTDB-5848] Pipe: task construction and life cycle management (#9808)
    
    * introduce reconnect strategy when connection broken.
    
    * handle start / stop / create / drop pipe request by managing local tasks on data nodes.
    
    * provide a mechanism to handle idle and backpressure of pipe tasks.
---
 .../commons/pipe/task/meta/PipeStaticMeta.java     |   4 +-
 .../api/exception/PipeConnectionException.java     |  18 +--
 .../db/pipe/agent/plugin/PipePluginAgent.java      |  25 +++-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 139 +++++++++---------
 .../db/pipe/config/PipeCollectorConstant.java      |   3 +-
 ...torConstant.java => PipeConnectorConstant.java} |   7 +-
 ...torConstant.java => PipeProcessorConstant.java} |   7 +-
 .../core/collector/IoTDBDataRegionCollector.java   |   5 +-
 .../PipeHistoricalDataRegionTsFileCollector.java   |   4 +-
 .../realtime/PipeRealtimeDataRegionCollector.java  |   4 +-
 .../PipeRealtimeDataRegionHybridCollector.java     |  22 ++-
 .../connector/PipeConnectorSubtaskLifeCycle.java   |  31 +++-
 .../connector/PipeConnectorSubtaskManager.java     |  19 ++-
 .../event/view/collector/PipeEventCollector.java   |  59 +++++++-
 ...anager.java => PipeSubtaskExecutorManager.java} |  12 +-
 .../execution/scheduler/PipeTaskScheduler.java     |  18 +--
 .../org/apache/iotdb/db/pipe/task/PipeBuilder.java |  66 +++++++++
 .../org/apache/iotdb/db/pipe/task/PipeTask.java    |   9 +-
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |  60 +++++++-
 .../apache/iotdb/db/pipe/task/PipeTaskManager.java |  96 +++++++++++++
 .../EventSupplier.java}                            |  24 ++--
 .../queue/ListenableBlockingPendingQueue.java}     |  21 +--
 .../db/pipe/task/queue/ListenablePendingQueue.java | 159 +++++++++++++++++++++
 .../ListenableUnblockingPendingQueue.java}         |  18 +--
 .../PendingQueueEmptyToNotEmptyListener.java}      |   9 +-
 .../PendingQueueFullToNotFullListener.java}        |   9 +-
 .../PendingQueueNotEmptyToEmptyListener.java}      |   9 +-
 .../PendingQueueNotFullToFullListener.java}        |   9 +-
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |  55 +++++--
 .../db/pipe/task/stage/PipeTaskConnectorStage.java | 100 +++----------
 .../db/pipe/task/stage/PipeTaskProcessorStage.java | 105 +++++++++++---
 .../iotdb/db/pipe/task/stage/PipeTaskStage.java    |  95 ++++++++++--
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |  87 ++++++++---
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |  20 +--
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    |   6 +-
 .../collector/CachedSchemaPatternMatcherTest.java  |   6 +-
 .../core/collector/PipeRealtimeCollectTest.java    |  17 +--
 .../executor/PipeConnectorSubtaskExecutorTest.java |   8 +-
 .../executor/PipeProcessorSubtaskExecutorTest.java |  10 +-
 39 files changed, 1009 insertions(+), 366 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index eab7408386..f487de445c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -28,6 +28,7 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 public class PipeStaticMeta {
 
@@ -159,7 +160,8 @@ public class PipeStaticMeta {
 
   @Override
   public int hashCode() {
-    return pipeName.hashCode();
+    return Objects.hash(
+        pipeName, creationTime, collectorParameters, processorParameters, connectorParameters);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
similarity index 74%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
copy to pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
index 39ffaefeeb..14cacc73fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java
@@ -17,21 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.subtask;
+package org.apache.iotdb.pipe.api.exception;
 
-public class PipeAssignerSubtask extends PipeSubtask {
+public class PipeConnectionException extends PipeException {
 
-  public PipeAssignerSubtask(String taskID) {
-    super(taskID);
+  public PipeConnectionException(String message) {
+    super(message);
   }
 
-  @Override
-  protected void executeForAWhile() {
-    // do nothing
-  }
-
-  @Override
-  public void close() {
-    // TODO
+  public PipeConnectionException(String message, Throwable cause) {
+    super(message, cause);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index d0b42f6fa2..3474edddf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -19,12 +19,15 @@
 
 package org.apache.iotdb.db.pipe.agent.plugin;
 
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
-import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.PipePlugin;
@@ -191,16 +194,28 @@ public class PipePluginAgent {
   }
 
   public PipeCollector reflectCollector(PipeParameters collectorParameters) {
-    return new IoTDBDataRegionCollector(); // TODO: reflect plugin, use PipeIoTDBCollector as
-    // default collector
+    return (PipeCollector)
+        reflect(
+            collectorParameters.getStringOrDefault(
+                PipeCollectorConstant.COLLECTOR_KEY,
+                BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName()));
   }
 
   public PipeProcessor reflectProcessor(PipeParameters processorParameters) {
-    throw new UnsupportedOperationException("Not supported yet.");
+    return (PipeProcessor)
+        reflect(
+            processorParameters.getStringOrDefault(
+                PipeProcessorConstant.PROCESSOR_KEY,
+                BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName()));
   }
 
   public PipeConnector reflectConnector(PipeParameters connectorParameters) {
-    throw new UnsupportedOperationException("Not supported yet.");
+    if (!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
+      throw new PipeManagementException(
+          "Failed to reflect PipeConnector instance because 'connector' is not specified in the parameters.");
+    }
+    return (PipeConnector)
+        reflect(connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY));
   }
 
   private PipePlugin reflect(String pluginName) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index c0df1e1761..13cb1f5ab5 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -23,11 +23,15 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.task.PipeBuilder;
+import org.apache.iotdb.db.pipe.task.PipeTask;
+import org.apache.iotdb.db.pipe.task.PipeTaskManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 /**
  * State transition diagram of a pipe task:
  *
@@ -46,14 +50,16 @@ public class PipeTaskAgent {
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskAgent.class);
 
   private final PipeMetaKeeper pipeMetaKeeper;
+  private final PipeTaskManager pipeTaskManager;
 
   public PipeTaskAgent() {
     pipeMetaKeeper = new PipeMetaKeeper();
+    pipeTaskManager = new PipeTaskManager();
   }
 
   ////////////////////////// Pipe Task Management //////////////////////////
 
-  public void createPipe(PipeMeta pipeMeta) {
+  public synchronized void createPipe(PipeMeta pipeMeta) {
     final String pipeName = pipeMeta.getStaticMeta().getPipeName();
     final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
 
@@ -91,28 +97,20 @@ public class PipeTaskAgent {
       dropPipe(pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
     }
 
-    // build pipe task by consensus group
-    pipeMeta
-        .getRuntimeMeta()
-        .getConsensusGroupIdToTaskMetaMap()
-        .forEach(
-            ((consensusGroupId, pipeTaskMeta) -> {
-              createPipeTaskByConsensusGroup(
-                  pipeName, creationTime, consensusGroupId, pipeTaskMeta);
-            }));
+    // create pipe tasks and trigger create() method for each pipe task
+    final Map<TConsensusGroupId, PipeTask> pipeTasks = new PipeBuilder(pipeMeta).build();
+    for (PipeTask pipeTask : pipeTasks.values()) {
+      pipeTask.create();
+    }
+    pipeTaskManager.addPipeTasks(pipeMeta.getStaticMeta(), pipeTasks);
+
     // add pipe meta to pipe meta keeper
     // note that we do not need to set the status of pipe meta here, because the status of pipe meta
     // is already set to STOPPED when it is created
     pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta);
   }
 
-  public void createPipeTaskByConsensusGroup(
-      String pipeName,
-      long creationTime,
-      TConsensusGroupId consensusGroupId,
-      PipeTaskMeta pipeTaskMeta) {}
-
-  public void dropPipe(String pipeName, long creationTime) {
+  public synchronized void dropPipe(String pipeName, long creationTime) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
     if (existedPipeMeta == null) {
@@ -135,22 +133,26 @@ public class PipeTaskAgent {
     // but the pipe task meta has not been cleaned up (in case of failure when executing
     // dropPipeTaskByConsensusGroup).
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
-    // drop pipe task by consensus group
-    existedPipeMeta
-        .getRuntimeMeta()
-        .getConsensusGroupIdToTaskMetaMap()
-        .forEach(
-            ((consensusGroupId, pipeTaskMeta) -> {
-              dropPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
-            }));
+
+    // drop pipe tasks and trigger drop() method for each pipe task
+    final Map<TConsensusGroupId, PipeTask> pipeTasks =
+        pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
+    if (pipeTasks == null) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip dropping.",
+          pipeName,
+          creationTime);
+      return;
+    }
+    for (PipeTask pipeTask : pipeTasks.values()) {
+      pipeTask.drop();
+    }
+
     // remove pipe meta from pipe meta keeper
     pipeMetaKeeper.removePipeMeta(pipeName);
   }
 
-  public void dropPipeTaskByConsensusGroup(
-      String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}
-
-  public void dropPipe(String pipeName) {
+  public synchronized void dropPipe(String pipeName) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
     if (existedPipeMeta == null) {
@@ -163,21 +165,24 @@ public class PipeTaskAgent {
     // but the pipe task meta has not been cleaned up (in case of failure when executing
     // dropPipeTaskByConsensusGroup).
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
-    // drop pipe task by consensus group
-    existedPipeMeta
-        .getRuntimeMeta()
-        .getConsensusGroupIdToTaskMetaMap()
-        .forEach(
-            ((consensusGroupId, pipeTaskMeta) -> {
-              dropPipeTaskByConsensusGroup(pipeName, consensusGroupId);
-            }));
+
+    // drop pipe tasks and trigger drop() method for each pipe task
+    final Map<TConsensusGroupId, PipeTask> pipeTasks =
+        pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
+    if (pipeTasks == null) {
+      LOGGER.info(
+          "Pipe {} has already been dropped or has not been created. Skip dropping.", pipeName);
+      return;
+    }
+    for (PipeTask pipeTask : pipeTasks.values()) {
+      pipeTask.drop();
+    }
+
     // remove pipe meta from pipe meta keeper
     pipeMetaKeeper.removePipeMeta(pipeName);
   }
 
-  public void dropPipeTaskByConsensusGroup(String pipeName, TConsensusGroupId consensusGroupId) {}
-
-  public void startPipe(String pipeName, long creationTime) {
+  public synchronized void startPipe(String pipeName, long creationTime) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
     if (existedPipeMeta == null) {
@@ -223,22 +228,25 @@ public class PipeTaskAgent {
             "Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
     }
 
-    // start pipe task by consensus group
-    existedPipeMeta
-        .getRuntimeMeta()
-        .getConsensusGroupIdToTaskMetaMap()
-        .forEach(
-            ((consensusGroupId, pipeTaskMeta) -> {
-              startPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
-            }));
+    // trigger start() method for each pipe task
+    final Map<TConsensusGroupId, PipeTask> pipeTasks =
+        pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
+    if (pipeTasks == null) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip starting.",
+          pipeName,
+          creationTime);
+      return;
+    }
+    for (PipeTask pipeTask : pipeTasks.values()) {
+      pipeTask.start();
+    }
+
     // set pipe meta status to RUNNING
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
   }
 
-  public void startPipeTaskByConsensusGroup(
-      String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}
-
-  public void stopPipe(String pipeName, long creationTime) {
+  public synchronized void stopPipe(String pipeName, long creationTime) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
     if (existedPipeMeta == null) {
@@ -284,18 +292,21 @@ public class PipeTaskAgent {
             "Unexpected status: " + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
     }
 
-    // stop pipe task by consensus group
-    existedPipeMeta
-        .getRuntimeMeta()
-        .getConsensusGroupIdToTaskMetaMap()
-        .forEach(
-            ((consensusGroupId, pipeTaskMeta) -> {
-              stopPipeTaskByConsensusGroup(pipeName, creationTime, consensusGroupId);
-            }));
+    // trigger stop() method for each pipe task
+    final Map<TConsensusGroupId, PipeTask> pipeTasks =
+        pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
+    if (pipeTasks == null) {
+      LOGGER.info(
+          "Pipe {} (creation time = {}) has already been dropped or has not been created. Skip stopping.",
+          pipeName,
+          creationTime);
+      return;
+    }
+    for (PipeTask pipeTask : pipeTasks.values()) {
+      pipeTask.stop();
+    }
+
     // set pipe meta status to STOPPED
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
   }
-
-  public void stopPipeTaskByConsensusGroup(
-      String pipeName, long creationTime, TConsensusGroupId consensusGroupId) {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
index b75d2168da..5906de3a49 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
@@ -21,8 +21,9 @@ package org.apache.iotdb.db.pipe.config;
 
 public class PipeCollectorConstant {
 
+  public static final String COLLECTOR_KEY = "collector";
   public static final String PATTERN_PATTERN_KEY = "collector.pattern";
-  public static final String PATTERN_DATA_REGION_KEY = "collector.data-region";
+  public static final String DATA_REGION_KEY = "collector.data-region";
 
   private PipeCollectorConstant() {
     throw new IllegalStateException("Utility class");
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
similarity index 80%
copy from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
index b75d2168da..67a637503b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
@@ -19,12 +19,11 @@
 
 package org.apache.iotdb.db.pipe.config;
 
-public class PipeCollectorConstant {
+public class PipeConnectorConstant {
 
-  public static final String PATTERN_PATTERN_KEY = "collector.pattern";
-  public static final String PATTERN_DATA_REGION_KEY = "collector.data-region";
+  public static final String CONNECTOR_KEY = "connector";
 
-  private PipeCollectorConstant() {
+  private PipeConnectorConstant() {
     throw new IllegalStateException("Utility class");
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
similarity index 80%
copy from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
index b75d2168da..1af34f3ef3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeProcessorConstant.java
@@ -19,12 +19,11 @@
 
 package org.apache.iotdb.db.pipe.config;
 
-public class PipeCollectorConstant {
+public class PipeProcessorConstant {
 
-  public static final String PATTERN_PATTERN_KEY = "collector.pattern";
-  public static final String PATTERN_DATA_REGION_KEY = "collector.data-region";
+  public static final String PROCESSOR_KEY = "processor";
 
-  private PipeCollectorConstant() {
+  private PipeProcessorConstant() {
     throw new IllegalStateException("Utility class");
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index 0fc9fdf69e..d7526fa187 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.core.collector;
 import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -38,9 +39,9 @@ public class IoTDBDataRegionCollector implements PipeCollector {
   // TODO: support pattern in historical collector
   private final PipeHistoricalDataRegionTsFileCollector historicalCollector;
 
-  public IoTDBDataRegionCollector() {
+  public IoTDBDataRegionCollector(ListenableUnblockingPendingQueue<Event> collectorPendingQueue) {
     hasBeenStarted = new AtomicBoolean(false);
-    realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
+    realtimeCollector = new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
     historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 9ba2f3f9b7..4559ca8a53 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -43,13 +43,13 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
-    validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
+    validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
   }
 
   @Override
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
-    dataRegionId = parameters.getInt(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
+    dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
index 0825b14023..08e731df13 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -35,14 +35,14 @@ public abstract class PipeRealtimeDataRegionCollector implements PipeCollector {
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_PATTERN_KEY);
-    validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
+    validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
   }
 
   @Override
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
     pattern = parameters.getString(PipeCollectorConstant.PATTERN_PATTERN_KEY);
-    dataRegionId = parameters.getString(PipeCollectorConstant.PATTERN_DATA_REGION_KEY);
+    dataRegionId = parameters.getString(PipeCollectorConstant.DATA_REGION_KEY);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index a48654d292..d8b79fbfcf 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -22,13 +22,12 @@ package org.apache.iotdb.db.pipe.core.collector.realtime;
 import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ArrayBlockingQueue;
-
 // TODO: make this collector as a builtin pipe plugin. register it in BuiltinPipePlugin.
 public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegionCollector {
 
@@ -38,12 +37,11 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
   // TODO: memory control
   // This queue is used to store pending events collected by the method collect(). The method
   // supply() will poll events from this queue and send them to the next pipe plugin.
-  private final ArrayBlockingQueue<PipeRealtimeCollectEvent> pendingQueue;
+  private final ListenableUnblockingPendingQueue<Event> pendingQueue;
 
-  public PipeRealtimeDataRegionHybridCollector() {
-    this.pendingQueue =
-        new ArrayBlockingQueue<>(
-            PipeConfig.getInstance().getRealtimeCollectorPendingQueueCapacity());
+  public PipeRealtimeDataRegionHybridCollector(
+      ListenableUnblockingPendingQueue<Event> pendingQueue) {
+    this.pendingQueue = pendingQueue;
   }
 
   @Override
@@ -90,9 +88,9 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
           String.format(
               "Pending Queue of Hybrid Realtime Collector %s has reached capacity, discard TsFile Event %s, current state %s",
               this, event, event.getTsFileEpoch().getState(this)));
-      // TODO: more degradation strategies
-      // TODO: dynamic control of the pending queue capacity
-      // TODO: should be handled by the PipeRuntimeAgent
+      // this would not happen, but just in case.
+      // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+      // TODO: memory control when elements in queue are too many.
     }
   }
 
@@ -103,7 +101,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
 
   @Override
   public Event supply() {
-    PipeRealtimeCollectEvent collectEvent = pendingQueue.poll();
+    PipeRealtimeCollectEvent collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
 
     while (collectEvent != null) {
       Event suppliedEvent;
@@ -124,7 +122,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
         return suppliedEvent;
       }
 
-      collectEvent = pendingQueue.poll();
+      collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
     }
 
     // means the pending queue is empty.
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
index 7e71ae2f7a..9ed53ae310 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
@@ -20,20 +20,36 @@
 package org.apache.iotdb.db.pipe.core.connector;
 
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
+import org.apache.iotdb.pipe.api.event.Event;
 
 public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
   private final PipeConnectorSubtaskExecutor executor;
   private final PipeConnectorSubtask subtask;
+  private final ListenableBlockingPendingQueue<Event> pendingQueue;
 
   private int runningTaskCount;
   private int aliveTaskCount;
 
   public PipeConnectorSubtaskLifeCycle(
-      PipeConnectorSubtaskExecutor executor, PipeConnectorSubtask subtask) {
+      PipeConnectorSubtaskExecutor executor,
+      PipeConnectorSubtask subtask,
+      ListenableBlockingPendingQueue<Event> pendingQueue) {
     this.executor = executor;
     this.subtask = subtask;
+    this.pendingQueue = pendingQueue;
+
+    pendingQueue.registerEmptyToNotEmptyListener(
+        subtask.getTaskID(),
+        () -> {
+          if (hasRunningTasks()) {
+            executor.start(subtask.getTaskID());
+          }
+        });
+    this.pendingQueue.registerNotEmptyToEmptyListener(
+        subtask.getTaskID(), () -> executor.stop(subtask.getTaskID()));
 
     runningTaskCount = 0;
     aliveTaskCount = 0;
@@ -43,6 +59,10 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
     return subtask;
   }
 
+  public ListenableBlockingPendingQueue<Event> getPendingQueue() {
+    return pendingQueue;
+  }
+
   public synchronized void register() {
     if (aliveTaskCount < 0) {
       throw new IllegalStateException("aliveTaskCount < 0");
@@ -63,7 +83,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
       throw new IllegalStateException("aliveTaskCount <= 0");
     }
     if (aliveTaskCount == 1) {
-      executor.deregister(subtask.getTaskID());
+      close();
       // this subtask is out of life cycle, should never be used again
       return true;
     }
@@ -93,6 +113,13 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
   @Override
   public synchronized void close() {
+    pendingQueue.removeEmptyToNotEmptyListener(subtask.getTaskID());
+    pendingQueue.removeNotEmptyToEmptyListener(subtask.getTaskID());
+
     executor.deregister(subtask.getTaskID());
   }
+
+  private synchronized boolean hasRunningTasks() {
+    return runningTaskCount > 0;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
index a00cedf7b5..070e4e05b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.db.pipe.core.connector;
 
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import java.util.HashMap;
@@ -42,10 +44,13 @@ public class PipeConnectorSubtaskManager {
 
     if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
       final PipeConnector pipeConnector = PipeAgent.plugin().reflectConnector(connectorAttributes);
+      // TODO: make pendingQueue size configurable
+      final ListenableBlockingPendingQueue<Event> pendingQueue =
+          new ListenableBlockingPendingQueue<>(65535);
       final PipeConnectorSubtask pipeConnectorSubtask =
-          new PipeConnectorSubtask(attributeSortedString, pipeConnector);
+          new PipeConnectorSubtask(attributeSortedString, pendingQueue, pipeConnector);
       final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
-          new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask);
+          new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, pendingQueue);
       attributeSortedString2SubtaskLifeCycleMap.put(
           attributeSortedString, pipeConnectorSubtaskLifeCycle);
     }
@@ -93,6 +98,16 @@ public class PipeConnectorSubtaskManager {
     return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask();
   }
 
+  public ListenableBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+      String attributeSortedString) {
+    if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
+      throw new PipeException(
+          "Failed to get PendingQueue. No such subtask: " + attributeSortedString);
+    }
+
+    return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getPendingQueue();
+  }
+
   /////////////////////////  Singleton Instance Holder  /////////////////////////
 
   private PipeConnectorSubtaskManager() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 7cc0778193..0d1d60fdde 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -19,21 +19,72 @@
 
 package org.apache.iotdb.db.pipe.core.event.view.collector;
 
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
-import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
 
 public class PipeEventCollector implements EventCollector {
 
+  private final ListenableBlockingPendingQueue<Event> pendingQueue;
+
+  // buffer queue is used to store events that are not offered to pending queue
+  // because the pending queue is full. when pending queue is full, pending queue
+  // will notify tasks to stop collecting events, and buffer queue will be used to store
+  // events before tasks are stopped. when pending queue is not full and tasks are
+  // notified by the pending queue to start collecting events, buffer queue will be used to store
+  // events before events in buffer queue are offered to pending queue.
+  private final Queue<Event> bufferQueue;
+
+  public PipeEventCollector(ListenableBlockingPendingQueue<Event> pendingQueue) {
+    this.pendingQueue = pendingQueue;
+    bufferQueue = new LinkedList<>();
+  }
+
   @Override
-  public void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException {}
+  public void collectTabletInsertionEvent(TabletInsertionEvent event) {
+    collect(event);
+  }
 
   @Override
-  public void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException {}
+  public void collectTsFileInsertionEvent(TsFileInsertionEvent event) {
+    collect(event);
+  }
 
   @Override
-  public void collectDeletionEvent(DeletionEvent event) throws IOException {}
+  public void collectDeletionEvent(DeletionEvent event) {
+    collect(event);
+  }
+
+  private synchronized void collect(Event event) {
+    while (!bufferQueue.isEmpty()) {
+      final Event bufferedEvent = bufferQueue.peek();
+      if (pendingQueue.offer(bufferedEvent)) {
+        bufferQueue.poll();
+      } else {
+        bufferQueue.offer(event);
+        return;
+      }
+    }
+
+    if (!pendingQueue.offer(event)) {
+      bufferQueue.offer(event);
+    }
+  }
+
+  public synchronized void tryCollectBufferedEvents() {
+    while (!bufferQueue.isEmpty()) {
+      final Event bufferedEvent = bufferQueue.peek();
+      if (pendingQueue.offer(bufferedEvent)) {
+        bufferQueue.poll();
+      } else {
+        return;
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutorManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutorManager.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
index 8698a23d86..fc6a335ad0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutorManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
@@ -26,9 +26,9 @@ import org.slf4j.LoggerFactory;
  * PipeTaskExecutor is responsible for executing the pipe tasks, and it is scheduled by the
  * PipeTaskScheduler. It is a singleton class.
  */
-public class PipeTaskExecutorManager {
+public class PipeSubtaskExecutorManager {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskExecutorManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtaskExecutorManager.class);
 
   private final PipeAssignerSubtaskExecutor assignerSubtaskExecutor;
   private final PipeProcessorSubtaskExecutor processorSubtaskExecutor;
@@ -48,19 +48,19 @@ public class PipeTaskExecutorManager {
 
   /////////////////////////  Singleton Instance Holder  /////////////////////////
 
-  private PipeTaskExecutorManager() {
+  private PipeSubtaskExecutorManager() {
     assignerSubtaskExecutor = new PipeAssignerSubtaskExecutor();
     processorSubtaskExecutor = new PipeProcessorSubtaskExecutor();
     connectorSubtaskExecutor = new PipeConnectorSubtaskExecutor();
   }
 
   private static class PipeTaskExecutorHolder {
-    private static PipeTaskExecutorManager instance = null;
+    private static PipeSubtaskExecutorManager instance = null;
   }
 
-  public static PipeTaskExecutorManager setupAndGetInstance() {
+  public static synchronized PipeSubtaskExecutorManager getInstance() {
     if (PipeTaskExecutorHolder.instance == null) {
-      PipeTaskExecutorHolder.instance = new PipeTaskExecutorManager();
+      PipeTaskExecutorHolder.instance = new PipeSubtaskExecutorManager();
     }
     return PipeTaskExecutorHolder.instance;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
index 188bbac0e4..4f035ca671 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.scheduler;
 
-import org.apache.iotdb.db.pipe.execution.executor.PipeTaskExecutorManager;
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
 
 /**
  * PipeTaskScheduler is a singleton class that manages the numbers of threads used by
@@ -27,34 +27,34 @@ import org.apache.iotdb.db.pipe.execution.executor.PipeTaskExecutorManager;
  */
 public class PipeTaskScheduler {
 
-  private final PipeTaskExecutorManager pipeTaskExecutorManager =
-      PipeTaskExecutorManager.setupAndGetInstance();
+  private final PipeSubtaskExecutorManager pipeSubtaskExecutorManager =
+      PipeSubtaskExecutorManager.getInstance();
 
   public void adjustAssignerSubtaskExecutorThreadNum(int threadNum) {
     // TODO: make it configurable by setting different parameters
-    pipeTaskExecutorManager.getAssignerSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
+    pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
   }
 
   public int getAssignerSubtaskExecutorThreadNum() {
-    return pipeTaskExecutorManager.getAssignerSubtaskExecutor().getExecutorThreadNumber();
+    return pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().getExecutorThreadNumber();
   }
 
   public void adjustConnectorSubtaskExecutorThreadNum(int threadNum) {
     // TODO: make it configurable by setting different parameters
-    pipeTaskExecutorManager.getConnectorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
+    pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
   }
 
   public int getConnectorSubtaskExecutorThreadNum() {
-    return pipeTaskExecutorManager.getConnectorSubtaskExecutor().getExecutorThreadNumber();
+    return pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().getExecutorThreadNumber();
   }
 
   public void adjustProcessorSubtaskExecutorThreadNum(int threadNum) {
     // TODO: make it configurable by setting different parameters
-    pipeTaskExecutorManager.getProcessorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
+    pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
   }
 
   public int getProcessorSubtaskExecutorThreadNum() {
-    return pipeTaskExecutorManager.getProcessorSubtaskExecutor().getExecutorThreadNumber();
+    return pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().getExecutorThreadNumber();
   }
 
   /////////////////////////  Singleton Instance Holder  /////////////////////////
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
new file mode 100644
index 0000000000..b825784526
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeBuilder {
+
+  private final PipeMeta pipeMeta;
+
+  public PipeBuilder(PipeMeta pipeMeta) {
+    this.pipeMeta = pipeMeta;
+  }
+
+  public Map<TConsensusGroupId, PipeTask> build() {
+    final PipeStaticMeta pipeStaticMeta = pipeMeta.getStaticMeta();
+    final String pipeName = pipeStaticMeta.getPipeName();
+    final PipeParameters collectorParameters = pipeStaticMeta.getCollectorParameters();
+    final PipeParameters processorParameters = pipeStaticMeta.getProcessorParameters();
+    final PipeParameters connectorParameters = pipeStaticMeta.getConnectorParameters();
+
+    final Map<TConsensusGroupId, PipeTask> consensusGroupIdToPipeTaskMap = new HashMap<>();
+
+    final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta();
+    for (Map.Entry<TConsensusGroupId, PipeTaskMeta> consensusGroupIdPipeTaskMeta :
+        pipeRuntimeMeta.getConsensusGroupIdToTaskMetaMap().entrySet()) {
+      consensusGroupIdToPipeTaskMap.put(
+          consensusGroupIdPipeTaskMeta.getKey(),
+          new PipeTaskBuilder(
+                  pipeName,
+                  Integer.toString(consensusGroupIdPipeTaskMeta.getValue().getRegionLeader()),
+                  // TODO: consensusGroupIdPipeTaskMeta.getValue().getProgressIndex() is not used
+                  collectorParameters,
+                  processorParameters,
+                  connectorParameters)
+              .build());
+    }
+
+    return consensusGroupIdToPipeTaskMap;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
index 416c2c17fa..26475c5c4b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
@@ -24,17 +24,20 @@ import org.apache.iotdb.db.pipe.task.stage.PipeTaskStage;
 public class PipeTask {
 
   private final String pipeName;
+  private final String dataRegionId;
 
   private final PipeTaskStage collectorStage;
   private final PipeTaskStage processorStage;
   private final PipeTaskStage connectorStage;
 
-  public PipeTask(
+  PipeTask(
       String pipeName,
+      String dataRegionId,
       PipeTaskStage collectorStage,
       PipeTaskStage processorStage,
       PipeTaskStage connectorStage) {
     this.pipeName = pipeName;
+    this.dataRegionId = dataRegionId;
 
     this.collectorStage = collectorStage;
     this.processorStage = processorStage;
@@ -65,6 +68,10 @@ public class PipeTask {
     connectorStage.stop();
   }
 
+  public String getDataRegionId() {
+    return dataRegionId;
+  }
+
   public String getPipeName() {
     return pipeName;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 19e5f460ea..23fb60c9a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -19,5 +19,61 @@
 
 package org.apache.iotdb.db.pipe.task;
 
-/** PipeTaskBuilder is used to build a PipeTask. */
-public class PipeTaskBuilder {}
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.db.pipe.task.stage.PipeTaskCollectorStage;
+import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
+import org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+
+public class PipeTaskBuilder {
+
+  private final String pipeName;
+  private final String dataRegionId;
+  private final PipeParameters pipeCollectorParameters;
+  private final PipeParameters pipeProcessorParameters;
+  private final PipeParameters pipeConnectorParameters;
+
+  PipeTaskBuilder(
+      String pipeName,
+      String dataRegionId,
+      PipeParameters pipeCollectorParameters,
+      PipeParameters pipeProcessorParameters,
+      PipeParameters pipeConnectorParameters) {
+    this.pipeName = pipeName;
+    this.dataRegionId = dataRegionId;
+    this.pipeCollectorParameters = pipeCollectorParameters;
+    this.pipeProcessorParameters = pipeProcessorParameters;
+    this.pipeConnectorParameters = pipeConnectorParameters;
+  }
+
+  PipeTaskBuilder(String dataRegionId, PipeStaticMeta pipeStaticMeta) {
+    this(
+        pipeStaticMeta.getPipeName(),
+        dataRegionId,
+        pipeStaticMeta.getCollectorParameters(),
+        pipeStaticMeta.getProcessorParameters(),
+        pipeStaticMeta.getConnectorParameters());
+  }
+
+  public PipeTask build() {
+    // event flow: collector -> processor -> connector
+
+    // we first build the collector and connector, then build the processor.
+    final PipeTaskCollectorStage collectorStage =
+        new PipeTaskCollectorStage(dataRegionId, pipeCollectorParameters);
+    final PipeTaskConnectorStage connectorStage =
+        new PipeTaskConnectorStage(pipeConnectorParameters);
+
+    // the processor connects the collector and connector.
+    final PipeTaskProcessorStage processorStage =
+        new PipeTaskProcessorStage(
+            pipeName,
+            dataRegionId,
+            collectorStage.getEventSupplier(),
+            collectorStage.getCollectorPendingQueue(),
+            pipeProcessorParameters,
+            connectorStage.getPipeConnectorPendingQueue());
+
+    return new PipeTask(pipeName, dataRegionId, collectorStage, processorStage, connectorStage);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java
new file mode 100644
index 0000000000..016b2537f9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeTaskManager {
+
+  private final Map<PipeStaticMeta, Map<TConsensusGroupId, PipeTask>> pipeMap = new HashMap<>();
+
+  /** Add pipe task by pipe static meta and consensus group id. */
+  public synchronized void addPipeTask(
+      PipeStaticMeta pipeStaticMeta, TConsensusGroupId consensusGroupId, PipeTask pipeTask) {
+    pipeMap.computeIfAbsent(pipeStaticMeta, k -> new HashMap<>()).put(consensusGroupId, pipeTask);
+  }
+
+  /** Add pipe tasks by pipe static meta. */
+  public synchronized void addPipeTasks(
+      PipeStaticMeta pipeStaticMeta, Map<TConsensusGroupId, PipeTask> pipeTasks) {
+    pipeMap.computeIfAbsent(pipeStaticMeta, k -> new HashMap<>()).putAll(pipeTasks);
+  }
+
+  /**
+   * Remove pipe task by pipe static meta and consensus group id.
+   *
+   * @param pipeStaticMeta pipe static meta
+   * @param consensusGroupId consensus group id
+   * @return pipe task if exists, null otherwise
+   */
+  public synchronized PipeTask removePipeTask(
+      PipeStaticMeta pipeStaticMeta, TConsensusGroupId consensusGroupId) {
+    Map<TConsensusGroupId, PipeTask> consensusGroupIdPipeTaskMap = pipeMap.get(pipeStaticMeta);
+    if (consensusGroupIdPipeTaskMap != null) {
+      return consensusGroupIdPipeTaskMap.remove(consensusGroupId);
+    }
+    return null;
+  }
+
+  /**
+   * Remove pipe tasks by pipe static meta.
+   *
+   * @param pipeStaticMeta pipe static meta
+   * @return pipe tasks if exists, null otherwise
+   */
+  public synchronized Map<TConsensusGroupId, PipeTask> removePipeTasks(
+      PipeStaticMeta pipeStaticMeta) {
+    return pipeMap.remove(pipeStaticMeta);
+  }
+
+  /**
+   * Get pipe task by pipe static meta and consensus group id.
+   *
+   * @param pipeStaticMeta pipe static meta
+   * @param consensusGroupId consensus group id
+   * @return pipe task if exists, null otherwise
+   */
+  public synchronized PipeTask getPipeTask(
+      PipeStaticMeta pipeStaticMeta, TConsensusGroupId consensusGroupId) {
+    Map<TConsensusGroupId, PipeTask> consensusGroupIdPipeTaskMap = pipeMap.get(pipeStaticMeta);
+    if (consensusGroupIdPipeTaskMap != null) {
+      return consensusGroupIdPipeTaskMap.get(consensusGroupId);
+    }
+    return null;
+  }
+
+  /**
+   * Get pipe tasks by pipe static meta.
+   *
+   * @param pipeStaticMeta pipe static meta
+   * @return pipe tasks if exists, null otherwise
+   */
+  public synchronized Map<TConsensusGroupId, PipeTask> getPipeTasks(PipeStaticMeta pipeStaticMeta) {
+    return pipeMap.get(pipeStaticMeta);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/EventSupplier.java
similarity index 65%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/EventSupplier.java
index 39ffaefeeb..ea056dc22a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/EventSupplier.java
@@ -17,21 +17,17 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.subtask;
+package org.apache.iotdb.db.pipe.task.queue;
 
-public class PipeAssignerSubtask extends PipeSubtask {
+import org.apache.iotdb.pipe.api.event.Event;
 
-  public PipeAssignerSubtask(String taskID) {
-    super(taskID);
-  }
+@FunctionalInterface
+public interface EventSupplier {
 
-  @Override
-  protected void executeForAWhile() {
-    // do nothing
-  }
-
-  @Override
-  public void close() {
-    // TODO
-  }
+  /**
+   * @return the event to be supplied. the event may be null if the collector has no more events at
+   *     the moment, but the collector is still running for more events.
+   * @throws Exception if the supplier fails to supply the event.
+   */
+  Event supply() throws Exception;
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
similarity index 59%
rename from server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
index c3c5aedb86..ab3090a0ad 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutorTest.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
@@ -17,24 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.execution.executor;
+package org.apache.iotdb.db.pipe.task.queue;
 
-import org.apache.iotdb.db.pipe.task.subtask.PipeAssignerSubtask;
+import org.apache.iotdb.pipe.api.event.Event;
 
-import org.junit.Before;
-import org.mockito.Mockito;
+import org.eclipse.jetty.util.BlockingArrayQueue;
 
-public class PipeAssignerSubtaskExecutorTest extends PipeSubtaskExecutorTest {
+public class ListenableBlockingPendingQueue<E extends Event> extends ListenablePendingQueue<E> {
 
-  @Before
-  public void setUp() throws Exception {
-    executor = new PipeAssignerSubtaskExecutor();
-
-    subtask =
-        Mockito.spy(
-            new PipeAssignerSubtask("PipeAssignerSubtaskExecutorTest") {
-              @Override
-              public void executeForAWhile() {}
-            });
+  public ListenableBlockingPendingQueue(int pendingQueueSize) {
+    super(new BlockingArrayQueue<>(pendingQueueSize));
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java
new file mode 100644
index 0000000000..f476d88053
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.queue;
+
+import org.apache.iotdb.pipe.api.event.Event;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class ListenablePendingQueue<E extends Event> {
+
+  private final Queue<E> pendingQueue;
+
+  private final Map<String, PendingQueueEmptyToNotEmptyListener> emptyToNotEmptyListeners =
+      new ConcurrentHashMap<>();
+  private final Map<String, PendingQueueNotEmptyToEmptyListener> notEmptyToEmptyListeners =
+      new ConcurrentHashMap<>();
+  private final Map<String, PendingQueueFullToNotFullListener> fullToNotFullListeners =
+      new ConcurrentHashMap<>();
+  private final Map<String, PendingQueueNotFullToFullListener> notFullToFullListeners =
+      new ConcurrentHashMap<>();
+
+  private final AtomicBoolean isFull = new AtomicBoolean(false);
+
+  protected ListenablePendingQueue(Queue<E> pendingQueue) {
+    this.pendingQueue = pendingQueue;
+  }
+
+  public ListenablePendingQueue<E> registerEmptyToNotEmptyListener(
+      String id, PendingQueueEmptyToNotEmptyListener listener) {
+    emptyToNotEmptyListeners.put(id, listener);
+    return this;
+  }
+
+  public void removeEmptyToNotEmptyListener(String id) {
+    emptyToNotEmptyListeners.remove(id);
+  }
+
+  public void notifyEmptyToNotEmptyListeners() {
+    emptyToNotEmptyListeners
+        .values()
+        .forEach(PendingQueueEmptyToNotEmptyListener::onPendingQueueEmptyToNotEmpty);
+  }
+
+  public ListenablePendingQueue<E> registerNotEmptyToEmptyListener(
+      String id, PendingQueueNotEmptyToEmptyListener listener) {
+    notEmptyToEmptyListeners.put(id, listener);
+    return this;
+  }
+
+  public void removeNotEmptyToEmptyListener(String id) {
+    notEmptyToEmptyListeners.remove(id);
+  }
+
+  public void notifyNotEmptyToEmptyListeners() {
+    notEmptyToEmptyListeners
+        .values()
+        .forEach(PendingQueueNotEmptyToEmptyListener::onPendingQueueNotEmptyToEmpty);
+  }
+
+  public ListenablePendingQueue<E> registerFullToNotFullListener(
+      String id, PendingQueueFullToNotFullListener listener) {
+    fullToNotFullListeners.put(id, listener);
+    return this;
+  }
+
+  public void removeFullToNotFullListener(String id) {
+    fullToNotFullListeners.remove(id);
+  }
+
+  public void notifyFullToNotFullListeners() {
+    fullToNotFullListeners
+        .values()
+        .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
+  }
+
+  public ListenablePendingQueue<E> registerNotFullToFullListener(
+      String id, PendingQueueNotFullToFullListener listener) {
+    notFullToFullListeners.put(id, listener);
+    return this;
+  }
+
+  public void removeNotFullToFullListener(String id) {
+    notFullToFullListeners.remove(id);
+  }
+
+  public void notifyNotFullToFullListeners() {
+    notFullToFullListeners
+        .values()
+        .forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull);
+  }
+
+  public boolean offer(E event) {
+    final boolean isEmpty = pendingQueue.isEmpty();
+    final boolean isAdded = pendingQueue.offer(event);
+
+    if (isAdded) {
+      // we don't use size() == 1 to check whether the listener should be called,
+      // because offer() and size() are not atomic, and we don't want to use lock
+      // to make them atomic.
+      if (isEmpty) {
+        notifyEmptyToNotEmptyListeners();
+      }
+    } else {
+      if (isFull.compareAndSet(false, true)) {
+        notifyNotFullToFullListeners();
+      }
+    }
+
+    return isAdded;
+  }
+
+  public E poll() {
+    final boolean isEmpty = pendingQueue.isEmpty();
+    final E event = pendingQueue.poll();
+
+    if (event == null) {
+      // we don't use size() == 0 to check whether the listener should be called,
+      // because poll() and size() are not atomic, and we don't want to use lock
+      // to make them atomic.
+      if (!isEmpty) {
+        notifyNotEmptyToEmptyListeners();
+      }
+    } else {
+      if (isFull.compareAndSet(true, false)) {
+        notifyFullToNotFullListeners();
+      }
+    }
+
+    return event;
+  }
+
+  public void clear() {
+    pendingQueue.clear();
+  }
+
+  public int size() {
+    return pendingQueue.size();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
similarity index 71%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
index 39ffaefeeb..c2772b4eeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeAssignerSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
@@ -17,21 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task.subtask;
+package org.apache.iotdb.db.pipe.task.queue;
 
-public class PipeAssignerSubtask extends PipeSubtask {
+import org.apache.iotdb.pipe.api.event.Event;
 
-  public PipeAssignerSubtask(String taskID) {
-    super(taskID);
-  }
+import java.util.concurrent.ConcurrentLinkedQueue;
 
-  @Override
-  protected void executeForAWhile() {
-    // do nothing
-  }
+public class ListenableUnblockingPendingQueue<E extends Event> extends ListenablePendingQueue<E> {
 
-  @Override
-  public void close() {
-    // TODO
+  public ListenableUnblockingPendingQueue() {
+    super(new ConcurrentLinkedQueue<>());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
similarity index 83%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
index 19e5f460ea..d56b6e789e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueEmptyToNotEmptyListener.java
@@ -17,7 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task;
+package org.apache.iotdb.db.pipe.task.queue;
 
-/** PipeTaskBuilder is used to build a PipeTask. */
-public class PipeTaskBuilder {}
+@FunctionalInterface
+public interface PendingQueueEmptyToNotEmptyListener {
+
+  void onPendingQueueEmptyToNotEmpty();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
similarity index 83%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
index 19e5f460ea..33225505f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueFullToNotFullListener.java
@@ -17,7 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task;
+package org.apache.iotdb.db.pipe.task.queue;
 
-/** PipeTaskBuilder is used to build a PipeTask. */
-public class PipeTaskBuilder {}
+@FunctionalInterface
+public interface PendingQueueFullToNotFullListener {
+
+  void onPendingQueueFullToNotFull();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
similarity index 83%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
index 19e5f460ea..4225783739 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotEmptyToEmptyListener.java
@@ -17,7 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task;
+package org.apache.iotdb.db.pipe.task.queue;
 
-/** PipeTaskBuilder is used to build a PipeTask. */
-public class PipeTaskBuilder {}
+@FunctionalInterface
+public interface PendingQueueNotEmptyToEmptyListener {
+
+  void onPendingQueueNotEmptyToEmpty();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
similarity index 83%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
index 19e5f460ea..2433cd4b8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/PendingQueueNotFullToFullListener.java
@@ -17,7 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.task;
+package org.apache.iotdb.db.pipe.task.queue;
 
-/** PipeTaskBuilder is used to build a PipeTask. */
-public class PipeTaskBuilder {}
+@FunctionalInterface
+public interface PendingQueueNotFullToFullListener {
+
+  void onPendingQueueNotFullToFull();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 51a344d580..73c127f3da 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -19,29 +19,59 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public class PipeTaskCollectorStage implements PipeTaskStage {
+public class PipeTaskCollectorStage extends PipeTaskStage {
 
   private final PipeParameters collectorParameters;
 
+  /**
+   * TODO: have a better way to control busy/idle status of PipeTaskCollectorStage.
+   *
+   * <p>Currently, this field is for IoTDBDataRegionCollector only. IoTDBDataRegionCollector uses
+   * collectorPendingQueue as an internal data structure to store realtime events.
+   *
+   * <p>PendingQueue can detect whether the queue is empty or not, and it can notify the
+   * PipeTaskProcessorStage to stop processing data when the queue is empty to avoid unnecessary
+   * processing, and it also can notify the PipeTaskProcessorStage to start processing data when the
+   * queue is not empty.
+   */
+  private ListenableUnblockingPendingQueue<Event> collectorPendingQueue;
+
   private PipeCollector pipeCollector;
 
-  PipeTaskCollectorStage(PipeParameters collectorParameters) {
+  public PipeTaskCollectorStage(String dataRegionId, PipeParameters collectorParameters) {
     this.collectorParameters = collectorParameters;
+    // set data region id to collector parameters, so that collector can get data region id inside
+    // collector
+    collectorParameters.getAttribute().put(PipeCollectorConstant.DATA_REGION_KEY, dataRegionId);
   }
 
   @Override
-  public void create() throws PipeException {
-    this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters);
+  public void createSubtask() throws PipeException {
+    if (collectorParameters
+        .getStringOrDefault(
+            PipeCollectorConstant.COLLECTOR_KEY,
+            BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())
+        .equals(BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())) {
+      collectorPendingQueue = new ListenableUnblockingPendingQueue<>();
+      this.pipeCollector = new IoTDBDataRegionCollector(collectorPendingQueue);
+    } else {
+      this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters);
+    }
   }
 
   @Override
-  public void start() throws PipeException {
+  public void startSubtask() throws PipeException {
     try {
       pipeCollector.start();
     } catch (Exception e) {
@@ -50,12 +80,12 @@ public class PipeTaskCollectorStage implements PipeTaskStage {
   }
 
   @Override
-  public void stop() throws PipeException {
+  public void stopSubtask() throws PipeException {
     // collector continuously collects data, so do nothing in stop
   }
 
   @Override
-  public void drop() throws PipeException {
+  public void dropSubtask() throws PipeException {
     try {
       pipeCollector.close();
     } catch (Exception e) {
@@ -63,8 +93,11 @@ public class PipeTaskCollectorStage implements PipeTaskStage {
     }
   }
 
-  @Override
-  public PipeSubtask getSubtask() {
-    throw new UnsupportedOperationException("Collector stage does not have subtask.");
+  public EventSupplier getEventSupplier() {
+    return () -> pipeCollector.supply();
+  }
+
+  public ListenableUnblockingPendingQueue<Event> getCollectorPendingQueue() {
+    return collectorPendingQueue;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 0194dc5c8a..62cadbde87 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,114 +19,48 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
-import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.db.pipe.core.connector.PipeConnectorSubtaskManager;
-import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public class PipeTaskConnectorStage implements PipeTaskStage {
+public class PipeTaskConnectorStage extends PipeTaskStage {
 
-  protected final PipeConnectorSubtaskExecutor executor;
-  protected final PipeParameters connectorAttributes;
+  protected final PipeParameters pipeConnectorParameters;
 
-  protected PipeStatus status = null;
   protected String connectorSubtaskId = null;
-  protected boolean hasBeenExternallyStopped = false;
 
-  protected PipeTaskConnectorStage(
-      PipeConnectorSubtaskExecutor executor, PipeParameters connectorAttributes) {
-    this.executor = executor;
-    this.connectorAttributes = connectorAttributes;
+  public PipeTaskConnectorStage(PipeParameters pipeConnectorParameters) {
+    this.pipeConnectorParameters = pipeConnectorParameters;
   }
 
   @Override
-  public synchronized void create() throws PipeException {
-    if (status != null) {
-      if (status == PipeStatus.RUNNING) {
-        throw new PipeException(
-            String.format("The PipeConnectorSubtask %s has been started", connectorSubtaskId));
-      }
-      if (status == PipeStatus.DROPPED) {
-        throw new PipeException(
-            String.format("The PipeConnectorSubtask %s has been dropped", connectorSubtaskId));
-      }
-      // status == PipeStatus.STOPPED
-      if (hasBeenExternallyStopped) {
-        throw new PipeException(
-            String.format(
-                "The PipeConnectorSubtask %s has been externally stopped", connectorSubtaskId));
-      }
-      // otherwise, do nothing to allow retry strategy
-      return;
-    }
-
-    // status == null, register the connector
+  public void createSubtask() throws PipeException {
     connectorSubtaskId =
-        PipeConnectorSubtaskManager.instance().register(executor, connectorAttributes);
-    status = PipeStatus.STOPPED;
+        PipeConnectorSubtaskManager.instance()
+            .register(
+                PipeSubtaskExecutorManager.getInstance().getConnectorSubtaskExecutor(),
+                pipeConnectorParameters);
   }
 
   @Override
-  public synchronized void start() throws PipeException {
-    if (status == null) {
-      throw new PipeException(
-          String.format("The PipeConnectorSubtask %s has not been created", connectorSubtaskId));
-    }
-    if (status == PipeStatus.RUNNING) {
-      // do nothing to allow retry strategy
-      return;
-    }
-    if (status == PipeStatus.DROPPED) {
-      throw new PipeException(
-          String.format("The PipeConnectorSubtask %s has been dropped", connectorSubtaskId));
-    }
-
-    // status == PipeStatus.STOPPED, start the connector
+  public void startSubtask() throws PipeException {
     PipeConnectorSubtaskManager.instance().start(connectorSubtaskId);
-    status = PipeStatus.RUNNING;
   }
 
   @Override
-  public synchronized void stop() throws PipeException {
-    if (status == null) {
-      throw new PipeException(
-          String.format("The PipeConnectorSubtask %s has not been created", connectorSubtaskId));
-    }
-    if (status == PipeStatus.STOPPED) {
-      // do nothing to allow retry strategy
-      return;
-    }
-    if (status == PipeStatus.DROPPED) {
-      throw new PipeException(
-          String.format("The PipeConnectorSubtask %s has been dropped", connectorSubtaskId));
-    }
-
-    // status == PipeStatus.RUNNING, stop the connector
+  public void stopSubtask() throws PipeException {
     PipeConnectorSubtaskManager.instance().stop(connectorSubtaskId);
-    status = PipeStatus.STOPPED;
-    hasBeenExternallyStopped = true;
   }
 
   @Override
-  public synchronized void drop() throws PipeException {
-    if (status == null) {
-      throw new PipeException(
-          String.format("The PipeConnectorSubtask %s has not been created", connectorSubtaskId));
-    }
-    if (status == PipeStatus.DROPPED) {
-      // do nothing to allow retry strategy
-      return;
-    }
-
-    // status == PipeStatus.RUNNING or PipeStatus.STOPPED, drop the connector
+  public void dropSubtask() throws PipeException {
     PipeConnectorSubtaskManager.instance().deregister(connectorSubtaskId);
-    status = PipeStatus.DROPPED;
   }
 
-  @Override
-  public PipeSubtask getSubtask() {
-    return PipeConnectorSubtaskManager.instance().getPipeConnectorSubtask(connectorSubtaskId);
+  public ListenableBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
+    return PipeConnectorSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index e7345be1e1..77bc2de5f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -19,45 +19,114 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
 import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenablePendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
-import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public class PipeTaskProcessorStage implements PipeTaskStage {
+import javax.annotation.Nullable;
 
-  protected final PipeSubtaskExecutor executor;
-  protected final PipeSubtask subtask;
+public class PipeTaskProcessorStage extends PipeTaskStage {
 
-  protected PipeTaskProcessorStage(
-      PipeProcessorSubtaskExecutor executor, PipeProcessorSubtask subtask) {
-    this.executor = executor;
-    this.subtask = subtask;
+  protected final PipeProcessorSubtaskExecutor executor =
+      PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
+
+  protected final PipeProcessorSubtask subtask;
+
+  protected final ListenablePendingQueue<Event> pipeCollectorInputPendingQueue;
+  protected final ListenablePendingQueue<Event> pipeConnectorOutputPendingQueue;
+
+  /**
+   * @param pipeName pipe name
+   * @param dataRegionId data region id
+   * @param pipeCollectorInputEventSupplier used to input events from pipe collector
+   * @param pipeCollectorInputPendingQueue used to listen whether pipe collector event queue is from
+   *     empty to not empty or from not empty to empty, null means no need to listen
+   * @param pipeProcessorParameters used to create pipe processor
+   * @param pipeConnectorOutputPendingQueue used to output events to pipe connector
+   */
+  public PipeTaskProcessorStage(
+      String pipeName,
+      String dataRegionId,
+      EventSupplier pipeCollectorInputEventSupplier,
+      @Nullable ListenablePendingQueue<Event> pipeCollectorInputPendingQueue,
+      PipeParameters pipeProcessorParameters,
+      ListenableBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
+    final String taskId = pipeName + "_" + dataRegionId;
+    final PipeProcessor pipeProcessor =
+        PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
+    final PipeEventCollector pipeConnectorOutputEventCollector =
+        new PipeEventCollector(pipeConnectorOutputPendingQueue);
+
+    this.subtask =
+        new PipeProcessorSubtask(
+            taskId,
+            pipeCollectorInputEventSupplier,
+            pipeProcessor,
+            pipeConnectorOutputEventCollector);
+
+    this.pipeCollectorInputPendingQueue =
+        pipeCollectorInputPendingQueue != null
+            ? pipeCollectorInputPendingQueue
+                .registerEmptyToNotEmptyListener(
+                    taskId,
+                    () -> {
+                      if (status == PipeStatus.RUNNING) {
+                        executor.start(subtask.getTaskID());
+                      }
+                    })
+                .registerNotEmptyToEmptyListener(taskId, () -> executor.stop(subtask.getTaskID()))
+            : null;
+    this.pipeConnectorOutputPendingQueue =
+        pipeConnectorOutputPendingQueue
+            .registerNotFullToFullListener(taskId, () -> executor.stop(subtask.getTaskID()))
+            .registerFullToNotFullListener(
+                taskId,
+                () -> {
+                  // only start when the pipe is running
+                  if (status == PipeStatus.RUNNING) {
+                    pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
+                    executor.start(subtask.getTaskID());
+                  }
+                });
   }
 
   @Override
-  public void create() throws PipeException {
+  public void createSubtask() throws PipeException {
     executor.register(subtask);
   }
 
   @Override
-  public void start() throws PipeException {
+  public void startSubtask() throws PipeException {
     executor.start(subtask.getTaskID());
   }
 
   @Override
-  public void stop() throws PipeException {
+  public void stopSubtask() throws PipeException {
     executor.stop(subtask.getTaskID());
   }
 
   @Override
-  public void drop() throws PipeException {
-    executor.deregister(subtask.getTaskID());
-  }
+  public void dropSubtask() throws PipeException {
+    final String taskId = subtask.getTaskID();
 
-  @Override
-  public PipeSubtask getSubtask() {
-    return subtask;
+    if (pipeCollectorInputPendingQueue != null) {
+      pipeCollectorInputPendingQueue.removeEmptyToNotEmptyListener(taskId);
+      pipeCollectorInputPendingQueue.removeNotEmptyToEmptyListener(taskId);
+    }
+
+    pipeConnectorOutputPendingQueue.removeNotFullToFullListener(taskId);
+    pipeConnectorOutputPendingQueue.removeFullToNotFullListener(taskId);
+
+    executor.deregister(taskId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
index 5f57fbabcb..e3a793c0d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -19,42 +19,113 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
-import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-public interface PipeTaskStage {
+public abstract class PipeTaskStage {
+
+  protected PipeStatus status = null;
+  protected boolean hasBeenExternallyStopped = false;
 
   /**
    * Create a pipe task stage.
    *
    * @throws PipeException if failed to create a pipe task stage.
    */
-  void create() throws PipeException;
+  public synchronized void create() {
+    if (status != null) {
+      if (status == PipeStatus.RUNNING) {
+        throw new PipeException("The PipeTaskStage has been started");
+      }
+      if (status == PipeStatus.DROPPED) {
+        throw new PipeException("The PipeTaskStage has been dropped");
+      }
+      // status == PipeStatus.STOPPED
+      if (hasBeenExternallyStopped) {
+        throw new PipeException("The PipeTaskStage has been externally stopped");
+      }
+      // otherwise, do nothing to allow retry strategy
+      return;
+    }
+
+    // status == null, register the subtask
+    createSubtask();
+
+    status = PipeStatus.STOPPED;
+  }
+
+  protected abstract void createSubtask() throws PipeException;
+
   /**
    * Start a pipe task stage.
    *
    * @throws PipeException if failed to start a pipe task stage.
    */
-  void start() throws PipeException;
+  public synchronized void start() {
+    if (status == null) {
+      throw new PipeException("The PipeTaskStage has not been created");
+    }
+    if (status == PipeStatus.RUNNING) {
+      // do nothing to allow retry strategy
+      return;
+    }
+    if (status == PipeStatus.DROPPED) {
+      throw new PipeException("The PipeTaskStage has been dropped");
+    }
+
+    // status == PipeStatus.STOPPED, start the subtask
+    startSubtask();
+
+    status = PipeStatus.RUNNING;
+  }
+
+  protected abstract void startSubtask() throws PipeException;
 
   /**
    * Stop a pipe task stage.
    *
    * @throws PipeException if failed to stop a pipe task stage.
    */
-  void stop() throws PipeException;
+  public synchronized void stop() {
+    if (status == null) {
+      throw new PipeException("The PipeTaskStage has not been created");
+    }
+    if (status == PipeStatus.STOPPED) {
+      // do nothing to allow retry strategy
+      return;
+    }
+    if (status == PipeStatus.DROPPED) {
+      throw new PipeException("The PipeTaskStage has been dropped");
+    }
+
+    // status == PipeStatus.RUNNING, stop the connector
+    stopSubtask();
+
+    status = PipeStatus.STOPPED;
+    hasBeenExternallyStopped = true;
+  }
+
+  protected abstract void stopSubtask() throws PipeException;
 
   /**
    * Drop a pipe task stage.
    *
    * @throws PipeException if failed to drop a pipe task stage.
    */
-  void drop() throws PipeException;
+  public synchronized void drop() {
+    if (status == null) {
+      throw new PipeException("The PipeTaskStage has not been created");
+    }
+    if (status == PipeStatus.DROPPED) {
+      // do nothing to allow retry strategy
+      return;
+    }
 
-  /**
-   * Get the pipe subtask.
-   *
-   * @return the pipe subtask.
-   */
-  PipeSubtask getSubtask();
+    // status == PipeStatus.RUNNING or PipeStatus.STOPPED, drop the connector
+    dropSubtask();
+
+    status = PipeStatus.DROPPED;
+  }
+
+  protected abstract void dropSubtask() throws PipeException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index b420d9e51a..e2ceb64480 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -19,55 +19,60 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ArrayBlockingQueue;
-
 public class PipeConnectorSubtask extends PipeSubtask {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtask.class);
 
-  // input
-  private final ArrayBlockingQueue<Event> pendingQueue;
-  // output
-  private final PipeConnector pipeConnector;
+  private final ListenableBlockingPendingQueue<Event> inputPendingQueue;
+  private final PipeConnector outputPipeConnector;
 
   /** @param taskID connectorAttributeSortedString */
-  public PipeConnectorSubtask(String taskID, PipeConnector pipeConnector) {
+  public PipeConnectorSubtask(
+      String taskID,
+      ListenableBlockingPendingQueue<Event> inputPendingQueue,
+      PipeConnector outputPipeConnector) {
     super(taskID);
-    // TODO: make the size of the queue size reasonable and configurable
-    this.pendingQueue = new ArrayBlockingQueue<>(1024 * 1024);
-    this.pipeConnector = pipeConnector;
-  }
-
-  public ArrayBlockingQueue<Event> getInputPendingQueue() {
-    return pendingQueue;
+    this.inputPendingQueue = inputPendingQueue;
+    this.outputPipeConnector = outputPipeConnector;
   }
 
   // TODO: for a while
   @Override
   protected void executeForAWhile() {
-    if (pendingQueue.isEmpty()) {
-      return;
+    try {
+      // TODO: reduce the frequency of heartbeat
+      outputPipeConnector.heartbeat();
+    } catch (Exception e) {
+      throw new PipeConnectionException(
+          "PipeConnector: failed to connect to the target system.", e);
     }
 
-    final Event event = pendingQueue.poll();
+    final Event event = inputPendingQueue.poll();
+    if (event == null) {
+      return;
+    }
 
     try {
       if (event instanceof TabletInsertionEvent) {
-        pipeConnector.transfer((TabletInsertionEvent) event);
+        outputPipeConnector.transfer((TabletInsertionEvent) event);
       } else if (event instanceof TsFileInsertionEvent) {
-        pipeConnector.transfer((TsFileInsertionEvent) event);
+        outputPipeConnector.transfer((TsFileInsertionEvent) event);
       } else if (event instanceof DeletionEvent) {
-        pipeConnector.transfer((DeletionEvent) event);
+        outputPipeConnector.transfer((DeletionEvent) event);
       } else {
         throw new RuntimeException("Unsupported event type: " + event.getClass().getName());
       }
@@ -79,10 +84,50 @@ public class PipeConnectorSubtask extends PipeSubtask {
     }
   }
 
+  @Override
+  public void onFailure(@NotNull Throwable throwable) {
+    // retry to connect to the target system if the connection is broken
+    if (throwable instanceof PipeConnectionException) {
+      int retry = 0;
+      while (retry < MAX_RETRY_TIMES) {
+        try {
+          outputPipeConnector.handshake();
+          break;
+        } catch (Exception e) {
+          retry++;
+          LOGGER.error("Failed to reconnect to the target system, retrying... ({} time(s))", retry);
+          try {
+            // TODO: make the retry interval configurable
+            Thread.sleep(retry * 1000L);
+          } catch (InterruptedException interruptedException) {
+            LOGGER.info(
+                "Interrupted while sleeping, perhaps need to check whether the thread is interrupted.");
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+
+      // stop current pipe task if failed to reconnect to the target system after MAX_RETRY_TIMES
+      // times
+      if (retry == MAX_RETRY_TIMES) {
+        LOGGER.error(
+            "Failed to reconnect to the target system after {} times, stopping current pipe task {}...",
+            MAX_RETRY_TIMES,
+            taskID);
+        lastFailedCause = throwable;
+        PipeAgent.runtime().report(this);
+        return;
+      }
+    }
+
+    // handle other exceptions as usual
+    super.onFailure(throwable);
+  }
+
   @Override
   public void close() {
     try {
-      pipeConnector.close();
+      outputPipeConnector.close();
     } catch (Exception e) {
       e.printStackTrace();
       LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 34f8045cb6..3b7a59aa9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -30,34 +32,36 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ArrayBlockingQueue;
-
 public class PipeProcessorSubtask extends PipeSubtask {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeProcessorSubtask.class);
 
-  private final ArrayBlockingQueue<Event> pendingEventQueue;
+  private final EventSupplier inputEventSupplier;
   private final PipeProcessor pipeProcessor;
   private final EventCollector outputEventCollector;
 
   public PipeProcessorSubtask(
       String taskID,
-      ArrayBlockingQueue<Event> pendingEventQueue,
+      EventSupplier inputEventSupplier,
       PipeProcessor pipeProcessor,
       EventCollector outputEventCollector) {
     super(taskID);
+    this.inputEventSupplier = inputEventSupplier;
     this.pipeProcessor = pipeProcessor;
-    this.pendingEventQueue = pendingEventQueue;
     this.outputEventCollector = outputEventCollector;
   }
 
   @Override
-  protected void executeForAWhile() {
-    if (pendingEventQueue.isEmpty()) {
+  protected void executeForAWhile() throws Exception {
+    Event event = inputEventSupplier.supply();
+    if (event == null) {
       return;
     }
 
-    final Event event = pendingEventQueue.poll();
+    if (event instanceof PipeRealtimeCollectEvent) {
+      // dispatch the event
+      event = ((PipeRealtimeCollectEvent) event).getEvent();
+    }
 
     try {
       if (event instanceof TabletInsertionEvent) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index 45b0331700..e01c139018 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -38,17 +38,17 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtask.class);
 
-  private final String taskID;
+  protected final String taskID;
 
   private ListeningExecutorService subtaskWorkerThreadPoolExecutor;
   private ExecutorService subtaskCallbackListeningExecutor;
 
   private final DecoratingLock callbackDecoratingLock = new DecoratingLock();
 
-  private static final int MAX_RETRY_TIMES = 5;
+  protected static final int MAX_RETRY_TIMES = 5;
   private final AtomicInteger retryCount = new AtomicInteger(0);
 
-  private Throwable lastFailedCause;
+  protected Throwable lastFailedCause;
 
   private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true);
 
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index c453ee6af3..f376feb49c 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -70,7 +70,7 @@ public class CachedSchemaPatternMatcherTest {
             new HashMap<String, String>() {
               {
                 put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root");
-                put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+                put(PipeCollectorConstant.DATA_REGION_KEY, "1");
               }
             }),
         null);
@@ -86,7 +86,7 @@ public class CachedSchemaPatternMatcherTest {
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI1);
-                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+                  put(PipeCollectorConstant.DATA_REGION_KEY, "1");
                 }
               }),
           null);
@@ -100,7 +100,7 @@ public class CachedSchemaPatternMatcherTest {
                 new HashMap<String, String>() {
                   {
                     put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI + "." + finalJ);
-                    put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, "1");
+                    put(PipeCollectorConstant.DATA_REGION_KEY, "1");
                   }
                 }),
             null);
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index f218e6865f..214b8b0774 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.EventType;
@@ -83,20 +84,20 @@ public class PipeRealtimeCollectTest {
     // set up realtime collector
 
     try (PipeRealtimeDataRegionHybridCollector collector1 =
-            new PipeRealtimeDataRegionHybridCollector();
+            new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector2 =
-            new PipeRealtimeDataRegionHybridCollector();
+            new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector3 =
-            new PipeRealtimeDataRegionHybridCollector();
+            new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector4 =
-            new PipeRealtimeDataRegionHybridCollector()) {
+            new PipeRealtimeDataRegionHybridCollector(new ListenableUnblockingPendingQueue<>())) {
 
       collector1.customize(
           new PipeParameters(
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
-                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion1);
+                  put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
                 }
               }),
           null);
@@ -105,7 +106,7 @@ public class PipeRealtimeCollectTest {
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
-                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion1);
+                  put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
                 }
               }),
           null);
@@ -114,7 +115,7 @@ public class PipeRealtimeCollectTest {
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
-                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion2);
+                  put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
                 }
               }),
           null);
@@ -123,7 +124,7 @@ public class PipeRealtimeCollectTest {
               new HashMap<String, String>() {
                 {
                   put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
-                  put(PipeCollectorConstant.PATTERN_DATA_REGION_KEY, dataRegion2);
+                  put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
                 }
               }),
           null);
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
index 6fbc7fe2c6..52acbe9c2f 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
+import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 
@@ -36,9 +37,8 @@ public class PipeConnectorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
     subtask =
         Mockito.spy(
             new PipeConnectorSubtask(
-                "PipeConnectorSubtaskExecutorTest", mock(PipeConnector.class)) {
-              @Override
-              public void executeForAWhile() {}
-            });
+                "PipeConnectorSubtaskExecutorTest",
+                mock(ListenableBlockingPendingQueue.class),
+                mock(PipeConnector.class)));
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
index 1abf2ab05e..d0a5208d53 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
+import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -26,8 +27,6 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.junit.Before;
 import org.mockito.Mockito;
 
-import java.util.concurrent.ArrayBlockingQueue;
-
 import static org.mockito.Mockito.mock;
 
 public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
@@ -40,11 +39,8 @@ public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
         Mockito.spy(
             new PipeProcessorSubtask(
                 "PipeProcessorSubtaskExecutorTest",
-                mock(ArrayBlockingQueue.class),
+                mock(EventSupplier.class),
                 mock(PipeProcessor.class),
-                mock(EventCollector.class)) {
-              @Override
-              public void executeForAWhile() {}
-            });
+                mock(EventCollector.class)));
   }
 }