You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/29 07:43:56 UTC

[iotdb] branch master updated: [IOTDB-5723] Pipe: progress index for simple consenesus (#9961)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5307bbfd77c [IOTDB-5723] Pipe: progress index for simple consenesus (#9961)
5307bbfd77c is described below

commit 5307bbfd77c597cda68d376214e279c4b7915146
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 29 15:43:47 2023 +0800

    [IOTDB-5723] Pipe: progress index for simple consenesus (#9961)
    
    This commit introduced SimpleConsensusIndex. See IOTDB-5723 for more details. It is related to #9446 & #9950.
    
    There are some more fixes introduced:
    
    * support dn task runtime error report
    * blocking way to consume events from pending queue
    * clean tsfile hardlink dir at startup
    * disable PipeAssignerSubtaskExecutor's construction by default
---
 .../commons/consensus/index/ProgressIndexType.java |   6 +
 .../consensus/index/impl/SimpleProgressIndex.java  | 160 +++++++++++++++++++++
 .../db/engine/storagegroup/TsFileProcessor.java    |   5 +-
 .../db/engine/storagegroup/TsFileResource.java     |   4 +
 .../{PipeLauncher.java => PipeAgentLauncher.java}  |   6 +-
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |  33 +++--
 .../SimpleConsensusProgressIndexAssigner.java      | 112 +++++++++++++++
 .../apache/iotdb/db/pipe/config/PipeConfig.java    |   6 +
 .../core/collector/IoTDBDataRegionCollector.java   |   5 +-
 .../PipeRealtimeDataRegionHybridCollector.java     |   8 +-
 .../listener/PipeInsertionDataNodeListener.java    |   7 +
 .../manager/PipeConnectorSubtaskLifeCycle.java     |   8 +-
 .../manager/PipeConnectorSubtaskManager.java       |  15 +-
 .../event/view/collector/PipeEventCollector.java   |   6 +-
 .../executor/PipeSubtaskExecutorManager.java       |   4 +-
 .../file/PipeHardlinkFileDirStartupCleaner.java    |  59 ++++++++
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |   3 +-
 .../task/queue/ListenableBlockingPendingQueue.java | 153 +++++++++++++++++++-
 ... => ListenableBoundedBlockingPendingQueue.java} |   5 +-
 .../db/pipe/task/queue/ListenablePendingQueue.java | 159 --------------------
 ...> ListenableUnboundedBlockingPendingQueue.java} |   9 +-
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |   8 +-
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  12 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  14 +-
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |  22 +--
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |   4 +-
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    |  18 ++-
 .../java/org/apache/iotdb/db/service/DataNode.java |   2 +-
 .../core/collector/PipeRealtimeCollectTest.java    |  10 +-
 .../executor/PipeConnectorSubtaskExecutorTest.java |   6 +-
 .../executor/PipeProcessorSubtaskExecutorTest.java |   2 +
 31 files changed, 624 insertions(+), 247 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
index 37d83a52efb..02afa4045df 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.consensus.index;
 
 import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
@@ -31,6 +32,7 @@ import java.nio.ByteBuffer;
 public enum ProgressIndexType {
   MINIMUM_CONSENSUS_INDEX((short) 1),
   IOT_CONSENSUS_INDEX((short) 2),
+  SIMPLE_CONSENSUS_INDEX((short) 3),
   ;
 
   private final short type;
@@ -58,6 +60,8 @@ public enum ProgressIndexType {
         return MinimumProgressIndex.deserializeFrom(byteBuffer);
       case 2:
         return IoTProgressIndex.deserializeFrom(byteBuffer);
+      case 3:
+        return SimpleProgressIndex.deserializeFrom(byteBuffer);
       default:
         throw new UnsupportedOperationException(
             String.format("Unsupported progress index type %s.", indexType));
@@ -71,6 +75,8 @@ public enum ProgressIndexType {
         return MinimumProgressIndex.deserializeFrom(stream);
       case 2:
         return IoTProgressIndex.deserializeFrom(stream);
+      case 3:
+        return SimpleProgressIndex.deserializeFrom(stream);
       default:
         throw new UnsupportedOperationException(
             String.format("Unsupported progress index type %s.", indexType));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
new file mode 100644
index 00000000000..6571d90759a
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class SimpleProgressIndex implements ProgressIndex {
+
+  private final int rebootTimes;
+  private final long memtableFlushOrderId;
+
+  public SimpleProgressIndex(int rebootTimes, long memtableFlushOrderId) {
+    this.rebootTimes = rebootTimes;
+    this.memtableFlushOrderId = memtableFlushOrderId;
+  }
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    ProgressIndexType.SIMPLE_CONSENSUS_INDEX.serialize(byteBuffer);
+
+    ReadWriteIOUtils.write(rebootTimes, byteBuffer);
+    ReadWriteIOUtils.write(memtableFlushOrderId, byteBuffer);
+  }
+
+  @Override
+  public void serialize(OutputStream stream) throws IOException {
+    ProgressIndexType.SIMPLE_CONSENSUS_INDEX.serialize(stream);
+
+    ReadWriteIOUtils.write(rebootTimes, stream);
+    ReadWriteIOUtils.write(memtableFlushOrderId, stream);
+  }
+
+  @Override
+  public boolean isAfter(ProgressIndex progressIndex) {
+    if (progressIndex instanceof MinimumProgressIndex) {
+      return true;
+    }
+
+    if (!(progressIndex instanceof SimpleProgressIndex)) {
+      return false;
+    }
+
+    final SimpleProgressIndex thisSimpleProgressIndex = this;
+    final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) progressIndex;
+    if (thisSimpleProgressIndex.rebootTimes > thatSimpleProgressIndex.rebootTimes) {
+      return true;
+    }
+    if (thisSimpleProgressIndex.rebootTimes < thatSimpleProgressIndex.rebootTimes) {
+      return false;
+    }
+    // thisSimpleProgressIndex.rebootTimes == thatSimpleProgressIndex.rebootTimes
+    return thisSimpleProgressIndex.memtableFlushOrderId
+        > thatSimpleProgressIndex.memtableFlushOrderId;
+  }
+
+  @Override
+  public boolean equals(ProgressIndex progressIndex) {
+    if (!(progressIndex instanceof SimpleProgressIndex)) {
+      return false;
+    }
+
+    final SimpleProgressIndex thisSimpleProgressIndex = this;
+    final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) progressIndex;
+    return thisSimpleProgressIndex.rebootTimes == thatSimpleProgressIndex.rebootTimes
+        && thisSimpleProgressIndex.memtableFlushOrderId
+            == thatSimpleProgressIndex.memtableFlushOrderId;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof SimpleProgressIndex)) {
+      return false;
+    }
+    return this.equals((SimpleProgressIndex) obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  @Override
+  public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex progressIndex) {
+    if (!(progressIndex instanceof SimpleProgressIndex)) {
+      return this;
+    }
+
+    final SimpleProgressIndex thisSimpleProgressIndex = this;
+    final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) progressIndex;
+    if (thisSimpleProgressIndex.rebootTimes > thatSimpleProgressIndex.rebootTimes) {
+      return this;
+    }
+    if (thisSimpleProgressIndex.rebootTimes < thatSimpleProgressIndex.rebootTimes) {
+      return progressIndex;
+    }
+    // thisSimpleProgressIndex.rebootTimes == thatSimpleProgressIndex.rebootTimes
+    if (thisSimpleProgressIndex.memtableFlushOrderId
+        > thatSimpleProgressIndex.memtableFlushOrderId) {
+      return this;
+    }
+    if (thisSimpleProgressIndex.memtableFlushOrderId
+        < thatSimpleProgressIndex.memtableFlushOrderId) {
+      return progressIndex;
+    }
+    // thisSimpleProgressIndex.memtableFlushOrderId == thatSimpleProgressIndex.memtableFlushOrderId
+    return this;
+  }
+
+  public static SimpleProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+    final int rebootTimes = ReadWriteIOUtils.readInt(byteBuffer);
+    final long memtableFlushOrderId = ReadWriteIOUtils.readLong(byteBuffer);
+    return new SimpleProgressIndex(rebootTimes, memtableFlushOrderId);
+  }
+
+  public static SimpleProgressIndex deserializeFrom(InputStream stream) throws IOException {
+    final int rebootTimes = ReadWriteIOUtils.readInt(stream);
+    final long memtableFlushOrderId = ReadWriteIOUtils.readLong(stream);
+    return new SimpleProgressIndex(rebootTimes, memtableFlushOrderId);
+  }
+
+  @Override
+  public String toString() {
+    return "SimpleProgressIndex{"
+        + "rebootTimes="
+        + rebootTimes
+        + ", memtableFlushOrderId="
+        + memtableFlushOrderId
+        + '}';
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index a124a8aa2c8..f8d752df5f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -833,13 +833,14 @@ public class TsFileProcessor {
         if (workMemTable != null) {
           logger.info(
               "{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile "
-                  + "size: {}, plan index: [{}, {}]",
+                  + "size: {}, plan index: [{}, {}], progress index: {}",
               storageGroupName,
               tsFileResource.getTsFile().getAbsolutePath(),
               workMemTable.memSize(),
               tsFileResource.getTsFileSize(),
               workMemTable.getMinPlanIndex(),
-              workMemTable.getMaxPlanIndex());
+              workMemTable.getMaxPlanIndex(),
+              tsFileResource.getMaxProgressIndex());
         } else {
           logger.info(
               "{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 774106dd593..858f36373f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -1178,6 +1178,10 @@ public class TsFileResource {
       throw new IllegalStateException(
           "Should not get progress index from a unclosing TsFileResource.");
     }
+    return getMaxProgressIndex();
+  }
+
+  public ProgressIndex getMaxProgressIndex() {
     return maxProgressIndex == null ? new MinimumProgressIndex() : maxProgressIndex;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 617b95008e6..12fb4a66152 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -48,13 +48,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
-class PipeLauncher {
+class PipeAgentLauncher {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(PipeLauncher.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeAgentLauncher.class);
 
   private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
 
-  private PipeLauncher() {
+  private PipeAgentLauncher() {
     // forbidding instantiation
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 5253d5a2db2..5c097b0c444 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -20,10 +20,12 @@
 package org.apache.iotdb.db.pipe.agent.runtime;
 
 import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.db.pipe.resource.file.PipeHardlinkFileDirStartupCleaner;
 import org.apache.iotdb.db.service.ResourcesInformationHolder;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
 
@@ -38,14 +40,21 @@ public class PipeRuntimeAgent implements IService {
 
   private static final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
-  public synchronized void launchPipePluginAgent(
+  private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner =
+      new SimpleConsensusProgressIndexAssigner();
+
+  //////////////////////////// System Service Interface ////////////////////////////
+
+  public synchronized void preparePipeResources(
       ResourcesInformationHolder resourcesInformationHolder) throws StartupException {
-    PipeLauncher.launchPipePluginAgent(resourcesInformationHolder);
+    PipeHardlinkFileDirStartupCleaner.clean();
+    PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
+    simpleConsensusProgressIndexAssigner.start();
   }
 
   @Override
   public synchronized void start() throws StartupException {
-    PipeLauncher.launchPipeTaskAgent();
+    PipeAgentLauncher.launchPipeTaskAgent();
 
     isShutdown.set(false);
   }
@@ -69,15 +78,15 @@ public class PipeRuntimeAgent implements IService {
     return ServiceType.PIPE_RUNTIME_AGENT;
   }
 
-  public void report(PipeSubtask subtask) {
-    // TODO: terminate the task by the given taskID
-    LOGGER.warn(
-        "Failed to execute task {} after many retries, last failed cause by {}",
-        subtask.getTaskID(),
-        subtask.getLastFailedCause());
+  ////////////////////// SimpleConsensus ProgressIndex Assigner //////////////////////
+
+  public void assignSimpleProgressIndexIfNeeded(TsFileResource tsFileResource) {
+    simpleConsensusProgressIndexAssigner.assignIfNeeded(tsFileResource);
   }
 
-  public void report(PipeRuntimeException pipeRuntimeException) {
-    // TODO: complete this method
+  //////////////////////////// Runtime Exception Handlers ////////////////////////////
+
+  public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeException) {
+    pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
new file mode 100644
index 00000000000..648c4994f07
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+
+public class SimpleConsensusProgressIndexAssigner {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SimpleConsensusProgressIndexAssigner.class);
+
+  private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final String PIPE_SYSTEM_DIR =
+      IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+          + File.separator
+          + "pipe"
+          + File.separator;
+  private static final String REBOOT_TIMES_FILE_NAME = "reboot_times.txt";
+
+  private boolean isEnable = false;
+
+  private int rebootTimes = 0;
+  private final AtomicLong memtableFlushOrderId = new AtomicLong(0);
+
+  public void start() throws StartupException {
+    // only works for simple consensus
+    if (!IOTDB_CONFIG.getDataRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)) {
+      return;
+    }
+
+    isEnable = true;
+    LOGGER.info("Start SimpleConsensusProgressIndexAssigner ...");
+
+    try {
+      makeDirIfNecessary();
+      parseRebootTimes();
+      recordRebootTimes();
+    } catch (Exception e) {
+      throw new StartupException(e);
+    }
+  }
+
+  private void makeDirIfNecessary() throws IOException {
+    File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR);
+    if (file.exists() && file.isDirectory()) {
+      return;
+    }
+    FileUtils.forceMkdir(file);
+  }
+
+  private void parseRebootTimes() {
+    File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR + REBOOT_TIMES_FILE_NAME);
+    if (!file.exists()) {
+      rebootTimes = 0;
+      return;
+    }
+    try {
+      String content = FileUtils.readFileToString(file, "UTF-8");
+      rebootTimes = Integer.parseInt(content);
+    } catch (IOException e) {
+      LOGGER.error("Cannot parse reboot times from file {}", file.getAbsolutePath(), e);
+      rebootTimes = 0;
+    }
+  }
+
+  private void recordRebootTimes() throws IOException {
+    File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR + REBOOT_TIMES_FILE_NAME);
+    FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1), "UTF-8");
+  }
+
+  public void assignIfNeeded(TsFileResource tsFileResource) {
+    if (!isEnable) {
+      return;
+    }
+
+    tsFileResource.updateProgressIndex(
+        new SimpleProgressIndex(rebootTimes, memtableFlushOrderId.getAndIncrement()));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
index 5690692e165..31eff23bc1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
@@ -47,6 +47,8 @@ public class PipeConfig {
 
   private final int readFileBufferSize = 8388608;
 
+  private final long pendingQueueMaxBlockingTimeMs = 1000;
+
   public int getDefaultRingBufferSize() {
     return defaultRingBufferSize;
   }
@@ -73,6 +75,10 @@ public class PipeConfig {
     return readFileBufferSize;
   }
 
+  public long getPendingQueueMaxBlockingTimeMs() {
+    return pendingQueueMaxBlockingTimeMs;
+  }
+
   /////////////////////////////// Singleton ///////////////////////////////
 
   private PipeConfig() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index fa7b07c182b..db5ece66fd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -46,7 +46,8 @@ public class IoTDBDataRegionCollector implements PipeCollector {
   private int dataRegionId;
 
   public IoTDBDataRegionCollector(
-      PipeTaskMeta pipeTaskMeta, ListenableUnblockingPendingQueue<Event> collectorPendingQueue) {
+      PipeTaskMeta pipeTaskMeta,
+      ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue) {
     hasBeenStarted = new AtomicBoolean(false);
     realtimeCollector =
         new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, collectorPendingQueue);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 4ec7ed92261..302594b1d02 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -42,10 +42,10 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
   // TODO: memory control
   // This queue is used to store pending events collected by the method collect(). The method
   // supply() will poll events from this queue and send them to the next pipe plugin.
-  private final ListenableUnblockingPendingQueue<Event> pendingQueue;
+  private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
 
   public PipeRealtimeDataRegionHybridCollector(
-      PipeTaskMeta pipeTaskMeta, ListenableUnblockingPendingQueue<Event> pendingQueue) {
+      PipeTaskMeta pipeTaskMeta, ListenableUnboundedBlockingPendingQueue<Event> pendingQueue) {
     super(pipeTaskMeta);
     this.pendingQueue = pendingQueue;
   }
@@ -186,7 +186,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
                 "TsFile Event %s can not be supplied because the reference count can not be increased, "
                     + "the data represented by this event is lost",
                 event.getEvent());
-        PipeAgent.runtime().report(new PipeRuntimeNonCriticalException(errorMessage));
+        PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
         return null;
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
index d39a2fb9dc4..68b2c5ca010 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.assigner.PipeDataRegionAssigner;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEventFactory;
@@ -45,6 +46,8 @@ public class PipeInsertionDataNodeListener {
   private final ConcurrentMap<String, PipeDataRegionAssigner> dataRegionId2Assigner =
       new ConcurrentHashMap<>();
 
+  //////////////////////////// start & stop ////////////////////////////
+
   public synchronized void startListenAndAssign(
       String dataRegionId, PipeRealtimeDataRegionCollector collector) {
     dataRegionId2Assigner
@@ -69,12 +72,16 @@ public class PipeInsertionDataNodeListener {
     }
   }
 
+  //////////////////////////// listen to events ////////////////////////////
+
   // TODO: listen to the tsfile synced from the other cluster
   // TODO: check whether the method is called on the right place. what is the meaning of the
   // variable shouldClose before calling this method?
   // TODO: maximum the efficiency of the method when there is no pipe in the system, avoid
   // dataRegionId2Assigner.get(dataRegionId);
   public void listenToTsFile(String dataRegionId, TsFileResource tsFileResource) {
+    PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
+
     final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
 
     // only events from registered data region will be collected
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
index a805844f855..8827bbb8d59 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.core.connector.manager;
 
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -28,7 +28,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
   private final PipeConnectorSubtaskExecutor executor;
   private final PipeConnectorSubtask subtask;
-  private final ListenableBlockingPendingQueue<Event> pendingQueue;
+  private final ListenableBoundedBlockingPendingQueue<Event> pendingQueue;
 
   private int runningTaskCount;
   private int aliveTaskCount;
@@ -36,7 +36,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
   public PipeConnectorSubtaskLifeCycle(
       PipeConnectorSubtaskExecutor executor,
       PipeConnectorSubtask subtask,
-      ListenableBlockingPendingQueue<Event> pendingQueue) {
+      ListenableBoundedBlockingPendingQueue<Event> pendingQueue) {
     this.executor = executor;
     this.subtask = subtask;
     this.pendingQueue = pendingQueue;
@@ -59,7 +59,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
     return subtask;
   }
 
-  public ListenableBlockingPendingQueue<Event> getPendingQueue() {
+  public ListenableBoundedBlockingPendingQueue<Event> getPendingQueue() {
     return pendingQueue;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
index 06d9942f525..a1aeade92b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
@@ -20,11 +20,12 @@
 package org.apache.iotdb.db.pipe.core.connector.manager;
 
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
 import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.IoTDBThriftConnectorV1;
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -44,7 +45,9 @@ public class PipeConnectorSubtaskManager {
       attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
 
   public synchronized String register(
-      PipeConnectorSubtaskExecutor executor, PipeParameters pipeConnectorParameters) {
+      PipeConnectorSubtaskExecutor executor,
+      PipeParameters pipeConnectorParameters,
+      PipeTaskMeta taskMeta) {
     final String attributeSortedString =
         new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
 
@@ -74,10 +77,10 @@ public class PipeConnectorSubtaskManager {
 
       // TODO: make pendingQueue size configurable
       // 2. construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle
-      final ListenableBlockingPendingQueue<Event> pendingQueue =
-          new ListenableBlockingPendingQueue<>(65535);
+      final ListenableBoundedBlockingPendingQueue<Event> pendingQueue =
+          new ListenableBoundedBlockingPendingQueue<>(1024);
       final PipeConnectorSubtask pipeConnectorSubtask =
-          new PipeConnectorSubtask(attributeSortedString, pendingQueue, pipeConnector);
+          new PipeConnectorSubtask(attributeSortedString, taskMeta, pendingQueue, pipeConnector);
       final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
           new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, pendingQueue);
       attributeSortedString2SubtaskLifeCycleMap.put(
@@ -127,7 +130,7 @@ public class PipeConnectorSubtaskManager {
     return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getSubtask();
   }
 
-  public ListenableBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+  public ListenableBoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
       String attributeSortedString) {
     if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
       throw new PipeException(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index c38b7c28a67..6f2a7d82591 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.core.event.view.collector;
 
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -29,7 +29,7 @@ import java.util.Queue;
 
 public class PipeEventCollector implements EventCollector {
 
-  private final ListenableBlockingPendingQueue<Event> pendingQueue;
+  private final ListenableBoundedBlockingPendingQueue<Event> pendingQueue;
 
   // buffer queue is used to store events that are not offered to pending queue
   // because the pending queue is full. when pending queue is full, pending queue
@@ -39,7 +39,7 @@ public class PipeEventCollector implements EventCollector {
   // events before events in buffer queue are offered to pending queue.
   private final Queue<Event> bufferQueue;
 
-  public PipeEventCollector(ListenableBlockingPendingQueue<Event> pendingQueue) {
+  public PipeEventCollector(ListenableBoundedBlockingPendingQueue<Event> pendingQueue) {
     this.pendingQueue = pendingQueue;
     bufferQueue = new LinkedList<>();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
index fc6a335ad0a..22905643a02 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
@@ -35,7 +35,7 @@ public class PipeSubtaskExecutorManager {
   private final PipeConnectorSubtaskExecutor connectorSubtaskExecutor;
 
   public PipeAssignerSubtaskExecutor getAssignerSubtaskExecutor() {
-    return assignerSubtaskExecutor;
+    throw new UnsupportedOperationException("currently not needed");
   }
 
   public PipeProcessorSubtaskExecutor getProcessorSubtaskExecutor() {
@@ -49,7 +49,7 @@ public class PipeSubtaskExecutorManager {
   /////////////////////////  Singleton Instance Holder  /////////////////////////
 
   private PipeSubtaskExecutorManager() {
-    assignerSubtaskExecutor = new PipeAssignerSubtaskExecutor();
+    assignerSubtaskExecutor = null;
     processorSubtaskExecutor = new PipeProcessorSubtaskExecutor();
     connectorSubtaskExecutor = new PipeConnectorSubtaskExecutor();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
new file mode 100644
index 00000000000..77faae275e5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.file;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.config.PipeConfig;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.DirectoryFileFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class PipeHardlinkFileDirStartupCleaner {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PipeHardlinkFileDirStartupCleaner.class);
+
+  /**
+   * Delete the data directory and all of its subdirectories that contain the
+   * PipeConfig.PIPE_TSFILE_DIR_NAME directory.
+   */
+  public static void clean() {
+    for (String dataDir : IoTDBDescriptor.getInstance().getConfig().getDataDirs()) {
+      for (File file :
+          FileUtils.listFilesAndDirs(
+              new File(dataDir), DirectoryFileFilter.INSTANCE, DirectoryFileFilter.INSTANCE)) {
+        if (file.isDirectory() && file.getName().equals(PipeConfig.PIPE_TSFILE_DIR_NAME)) {
+          LOGGER.info(
+              "pipe hardlink tsfile dir found, deleting it: {}, result: {}",
+              file,
+              FileUtils.deleteQuietly(file));
+        }
+      }
+    }
+  }
+
+  private PipeHardlinkFileDirStartupCleaner() {
+    // util class
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index d42d62e5f98..274b50e1a98 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -69,13 +69,14 @@ public class PipeTaskBuilder {
     final PipeTaskCollectorStage collectorStage =
         new PipeTaskCollectorStage(dataRegionId, pipeTaskMeta, pipeCollectorParameters);
     final PipeTaskConnectorStage connectorStage =
-        new PipeTaskConnectorStage(pipeConnectorParameters);
+        new PipeTaskConnectorStage(pipeConnectorParameters, pipeTaskMeta);
 
     // the processor connects the collector and connector.
     final PipeTaskProcessorStage processorStage =
         new PipeTaskProcessorStage(
             pipeName,
             dataRegionId,
+            pipeTaskMeta,
             collectorStage.getEventSupplier(),
             collectorStage.getCollectorPendingQueue(),
             pipeProcessorParameters,
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
index ab3090a0add..fb007b239f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
@@ -19,13 +19,158 @@
 
 package org.apache.iotdb.db.pipe.task.queue;
 
+import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.pipe.api.event.Event;
 
-import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class ListenableBlockingPendingQueue<E extends Event> extends ListenablePendingQueue<E> {
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-  public ListenableBlockingPendingQueue(int pendingQueueSize) {
-    super(new BlockingArrayQueue<>(pendingQueueSize));
+public abstract class ListenableBlockingPendingQueue<E extends Event> {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ListenableBlockingPendingQueue.class);
+
+  private static final long MAX_BLOCKING_TIME_MS =
+      PipeConfig.getInstance().getPendingQueueMaxBlockingTimeMs();
+
+  private final BlockingQueue<E> pendingQueue;
+
+  private final Map<String, PendingQueueEmptyToNotEmptyListener> emptyToNotEmptyListeners =
+      new ConcurrentHashMap<>();
+  private final Map<String, PendingQueueNotEmptyToEmptyListener> notEmptyToEmptyListeners =
+      new ConcurrentHashMap<>();
+  private final Map<String, PendingQueueFullToNotFullListener> fullToNotFullListeners =
+      new ConcurrentHashMap<>();
+  private final Map<String, PendingQueueNotFullToFullListener> notFullToFullListeners =
+      new ConcurrentHashMap<>();
+
+  private final AtomicBoolean isFull = new AtomicBoolean(false);
+
+  protected ListenableBlockingPendingQueue(BlockingQueue<E> pendingQueue) {
+    this.pendingQueue = pendingQueue;
+  }
+
+  public ListenableBlockingPendingQueue<E> registerEmptyToNotEmptyListener(
+      String id, PendingQueueEmptyToNotEmptyListener listener) {
+    emptyToNotEmptyListeners.put(id, listener);
+    return this;
+  }
+
+  public void removeEmptyToNotEmptyListener(String id) {
+    emptyToNotEmptyListeners.remove(id);
+  }
+
+  public void notifyEmptyToNotEmptyListeners() {
+    emptyToNotEmptyListeners
+        .values()
+        .forEach(PendingQueueEmptyToNotEmptyListener::onPendingQueueEmptyToNotEmpty);
+  }
+
+  public ListenableBlockingPendingQueue<E> registerNotEmptyToEmptyListener(
+      String id, PendingQueueNotEmptyToEmptyListener listener) {
+    notEmptyToEmptyListeners.put(id, listener);
+    return this;
+  }
+
+  public void removeNotEmptyToEmptyListener(String id) {
+    notEmptyToEmptyListeners.remove(id);
+  }
+
+  public void notifyNotEmptyToEmptyListeners() {
+    notEmptyToEmptyListeners
+        .values()
+        .forEach(PendingQueueNotEmptyToEmptyListener::onPendingQueueNotEmptyToEmpty);
+  }
+
+  public ListenableBlockingPendingQueue<E> registerFullToNotFullListener(
+      String id, PendingQueueFullToNotFullListener listener) {
+    fullToNotFullListeners.put(id, listener);
+    return this;
+  }
+
+  public void removeFullToNotFullListener(String id) {
+    fullToNotFullListeners.remove(id);
+  }
+
+  public void notifyFullToNotFullListeners() {
+    fullToNotFullListeners
+        .values()
+        .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
+  }
+
+  public ListenableBlockingPendingQueue<E> registerNotFullToFullListener(
+      String id, PendingQueueNotFullToFullListener listener) {
+    notFullToFullListeners.put(id, listener);
+    return this;
+  }
+
+  public void removeNotFullToFullListener(String id) {
+    notFullToFullListeners.remove(id);
+  }
+
+  public void notifyNotFullToFullListeners() {
+    notFullToFullListeners
+        .values()
+        .forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull);
+  }
+
+  public boolean offer(E event) {
+    final boolean isEmpty = pendingQueue.isEmpty();
+    final boolean isAdded = pendingQueue.offer(event);
+
+    if (isAdded) {
+      // we don't use size() == 1 to check whether the listener should be called,
+      // because offer() and size() are not atomic, and we don't want to use lock
+      // to make them atomic.
+      if (isEmpty) {
+        notifyEmptyToNotEmptyListeners();
+      }
+    } else {
+      if (isFull.compareAndSet(false, true)) {
+        notifyNotFullToFullListeners();
+      }
+    }
+
+    return isAdded;
+  }
+
+  public E poll() {
+    final boolean isEmpty = pendingQueue.isEmpty();
+    E event = null;
+    try {
+      event = pendingQueue.poll(MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.info("pending queue poll is interrupted.", e);
+      Thread.currentThread().interrupt();
+    }
+
+    if (event == null) {
+      // we don't use size() == 0 to check whether the listener should be called,
+      // because poll() and size() are not atomic, and we don't want to use lock
+      // to make them atomic.
+      if (!isEmpty) {
+        notifyNotEmptyToEmptyListeners();
+      }
+    } else {
+      if (isFull.compareAndSet(true, false)) {
+        notifyFullToNotFullListeners();
+      }
+    }
+
+    return event;
+  }
+
+  public void clear() {
+    pendingQueue.clear();
+  }
+
+  public int size() {
+    return pendingQueue.size();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java
similarity index 84%
copy from server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java
index ab3090a0add..da91cc4187b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBoundedBlockingPendingQueue.java
@@ -23,9 +23,10 @@ import org.apache.iotdb.pipe.api.event.Event;
 
 import org.eclipse.jetty.util.BlockingArrayQueue;
 
-public class ListenableBlockingPendingQueue<E extends Event> extends ListenablePendingQueue<E> {
+public class ListenableBoundedBlockingPendingQueue<E extends Event>
+    extends ListenableBlockingPendingQueue<E> {
 
-  public ListenableBlockingPendingQueue(int pendingQueueSize) {
+  public ListenableBoundedBlockingPendingQueue(int pendingQueueSize) {
     super(new BlockingArrayQueue<>(pendingQueueSize));
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java
deleted file mode 100644
index f476d880533..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenablePendingQueue.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.task.queue;
-
-import org.apache.iotdb.pipe.api.event.Event;
-
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public abstract class ListenablePendingQueue<E extends Event> {
-
-  private final Queue<E> pendingQueue;
-
-  private final Map<String, PendingQueueEmptyToNotEmptyListener> emptyToNotEmptyListeners =
-      new ConcurrentHashMap<>();
-  private final Map<String, PendingQueueNotEmptyToEmptyListener> notEmptyToEmptyListeners =
-      new ConcurrentHashMap<>();
-  private final Map<String, PendingQueueFullToNotFullListener> fullToNotFullListeners =
-      new ConcurrentHashMap<>();
-  private final Map<String, PendingQueueNotFullToFullListener> notFullToFullListeners =
-      new ConcurrentHashMap<>();
-
-  private final AtomicBoolean isFull = new AtomicBoolean(false);
-
-  protected ListenablePendingQueue(Queue<E> pendingQueue) {
-    this.pendingQueue = pendingQueue;
-  }
-
-  public ListenablePendingQueue<E> registerEmptyToNotEmptyListener(
-      String id, PendingQueueEmptyToNotEmptyListener listener) {
-    emptyToNotEmptyListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeEmptyToNotEmptyListener(String id) {
-    emptyToNotEmptyListeners.remove(id);
-  }
-
-  public void notifyEmptyToNotEmptyListeners() {
-    emptyToNotEmptyListeners
-        .values()
-        .forEach(PendingQueueEmptyToNotEmptyListener::onPendingQueueEmptyToNotEmpty);
-  }
-
-  public ListenablePendingQueue<E> registerNotEmptyToEmptyListener(
-      String id, PendingQueueNotEmptyToEmptyListener listener) {
-    notEmptyToEmptyListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeNotEmptyToEmptyListener(String id) {
-    notEmptyToEmptyListeners.remove(id);
-  }
-
-  public void notifyNotEmptyToEmptyListeners() {
-    notEmptyToEmptyListeners
-        .values()
-        .forEach(PendingQueueNotEmptyToEmptyListener::onPendingQueueNotEmptyToEmpty);
-  }
-
-  public ListenablePendingQueue<E> registerFullToNotFullListener(
-      String id, PendingQueueFullToNotFullListener listener) {
-    fullToNotFullListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeFullToNotFullListener(String id) {
-    fullToNotFullListeners.remove(id);
-  }
-
-  public void notifyFullToNotFullListeners() {
-    fullToNotFullListeners
-        .values()
-        .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
-  }
-
-  public ListenablePendingQueue<E> registerNotFullToFullListener(
-      String id, PendingQueueNotFullToFullListener listener) {
-    notFullToFullListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeNotFullToFullListener(String id) {
-    notFullToFullListeners.remove(id);
-  }
-
-  public void notifyNotFullToFullListeners() {
-    notFullToFullListeners
-        .values()
-        .forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull);
-  }
-
-  public boolean offer(E event) {
-    final boolean isEmpty = pendingQueue.isEmpty();
-    final boolean isAdded = pendingQueue.offer(event);
-
-    if (isAdded) {
-      // we don't use size() == 1 to check whether the listener should be called,
-      // because offer() and size() are not atomic, and we don't want to use lock
-      // to make them atomic.
-      if (isEmpty) {
-        notifyEmptyToNotEmptyListeners();
-      }
-    } else {
-      if (isFull.compareAndSet(false, true)) {
-        notifyNotFullToFullListeners();
-      }
-    }
-
-    return isAdded;
-  }
-
-  public E poll() {
-    final boolean isEmpty = pendingQueue.isEmpty();
-    final E event = pendingQueue.poll();
-
-    if (event == null) {
-      // we don't use size() == 0 to check whether the listener should be called,
-      // because poll() and size() are not atomic, and we don't want to use lock
-      // to make them atomic.
-      if (!isEmpty) {
-        notifyNotEmptyToEmptyListeners();
-      }
-    } else {
-      if (isFull.compareAndSet(true, false)) {
-        notifyFullToNotFullListeners();
-      }
-    }
-
-    return event;
-  }
-
-  public void clear() {
-    pendingQueue.clear();
-  }
-
-  public int size() {
-    return pendingQueue.size();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnboundedBlockingPendingQueue.java
similarity index 77%
rename from server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnboundedBlockingPendingQueue.java
index c2772b4eeb0..63611067190 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnblockingPendingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableUnboundedBlockingPendingQueue.java
@@ -21,11 +21,12 @@ package org.apache.iotdb.db.pipe.task.queue;
 
 import org.apache.iotdb.pipe.api.event.Event;
 
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
-public class ListenableUnblockingPendingQueue<E extends Event> extends ListenablePendingQueue<E> {
+public class ListenableUnboundedBlockingPendingQueue<E extends Event>
+    extends ListenableBlockingPendingQueue<E> {
 
-  public ListenableUnblockingPendingQueue() {
-    super(new ConcurrentLinkedQueue<>());
+  public ListenableUnboundedBlockingPendingQueue() {
+    super(new LinkedBlockingQueue<>());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 3dd445c7489..8146a3a3f28 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -51,7 +51,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
    * processing, and it also can notify the PipeTaskProcessorStage to start processing data when the
    * queue is not empty.
    */
-  private ListenableUnblockingPendingQueue<Event> collectorPendingQueue;
+  private ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue;
 
   private final PipeCollector pipeCollector;
 
@@ -76,7 +76,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
           .getAttribute()
           .put(PipeCollectorConstant.DATA_REGION_KEY, String.valueOf(dataRegionId.getId()));
 
-      collectorPendingQueue = new ListenableUnblockingPendingQueue<>();
+      collectorPendingQueue = new ListenableUnboundedBlockingPendingQueue<>();
       this.pipeCollector = new IoTDBDataRegionCollector(pipeTaskMeta, collectorPendingQueue);
     } else {
       this.collectorParameters = collectorParameters;
@@ -128,7 +128,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
     return pipeCollector::supply;
   }
 
-  public ListenableUnblockingPendingQueue<Event> getCollectorPendingQueue() {
+  public ListenableUnboundedBlockingPendingQueue<Event> getCollectorPendingQueue() {
     return collectorPendingQueue;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 7f147b130bf..98e8ba7e965 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,9 +19,10 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.core.connector.manager.PipeConnectorSubtaskManager;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -30,15 +31,16 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
 
   protected final PipeParameters pipeConnectorParameters;
 
-  protected String connectorSubtaskId = null;
+  protected String connectorSubtaskId;
 
-  public PipeTaskConnectorStage(PipeParameters pipeConnectorParameters) {
+  public PipeTaskConnectorStage(PipeParameters pipeConnectorParameters, PipeTaskMeta taskMeta) {
     this.pipeConnectorParameters = pipeConnectorParameters;
     connectorSubtaskId =
         PipeConnectorSubtaskManager.instance()
             .register(
                 PipeSubtaskExecutorManager.getInstance().getConnectorSubtaskExecutor(),
-                pipeConnectorParameters); // TODO: should split to create
+                pipeConnectorParameters,
+                taskMeta);
   }
 
   @Override
@@ -59,7 +61,7 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
     PipeConnectorSubtaskManager.instance().deregister(connectorSubtaskId);
   }
 
-  public ListenableBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
+  public ListenableBoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
     return PipeConnectorSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 5542b943d59..40a1b7d7be0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -21,13 +21,14 @@ package org.apache.iotdb.db.pipe.task.stage;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
 import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
-import org.apache.iotdb.db.pipe.task.queue.ListenablePendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -47,12 +48,13 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
   protected final PipeProcessor pipeProcessor;
   protected final PipeProcessorSubtask pipeProcessorSubtask;
 
-  protected final ListenablePendingQueue<Event> pipeCollectorInputPendingQueue;
-  protected final ListenablePendingQueue<Event> pipeConnectorOutputPendingQueue;
+  protected final ListenableBlockingPendingQueue<Event> pipeCollectorInputPendingQueue;
+  protected final ListenableBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue;
 
   /**
    * @param pipeName pipe name
    * @param dataRegionId data region id
+   * @param taskMeta pipe task meta
    * @param pipeCollectorInputEventSupplier used to input events from pipe collector
    * @param pipeCollectorInputPendingQueue used to listen whether pipe collector event queue is from
    *     empty to not empty or from not empty to empty, null means no need to listen
@@ -62,10 +64,11 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
   public PipeTaskProcessorStage(
       String pipeName,
       TConsensusGroupId dataRegionId,
+      PipeTaskMeta taskMeta,
       EventSupplier pipeCollectorInputEventSupplier,
-      @Nullable ListenablePendingQueue<Event> pipeCollectorInputPendingQueue,
+      @Nullable ListenableBlockingPendingQueue<Event> pipeCollectorInputPendingQueue,
       PipeParameters pipeProcessorParameters,
-      ListenableBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
+      ListenableBoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
     this.pipeProcessorParameters = pipeProcessorParameters;
 
     final String taskId = pipeName + "_" + dataRegionId;
@@ -76,6 +79,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     this.pipeProcessorSubtask =
         new PipeProcessorSubtask(
             taskId,
+            taskMeta,
             pipeCollectorInputEventSupplier,
             pipeProcessor,
             pipeConnectorOutputEventCollector);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index df0ea7d1fcd..4cbcb7e731f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -19,14 +19,16 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
 
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
@@ -36,7 +38,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtask.class);
 
-  private final ListenableBlockingPendingQueue<Event> inputPendingQueue;
+  private final ListenableBoundedBlockingPendingQueue<Event> inputPendingQueue;
   private final PipeConnector outputPipeConnector;
 
   private static final int HEARTBEAT_CHECK_INTERVAL = 1000;
@@ -45,9 +47,10 @@ public class PipeConnectorSubtask extends PipeSubtask {
   /** @param taskID connectorAttributeSortedString */
   public PipeConnectorSubtask(
       String taskID,
-      ListenableBlockingPendingQueue<Event> inputPendingQueue,
+      PipeTaskMeta taskMeta,
+      ListenableBoundedBlockingPendingQueue<Event> inputPendingQueue,
       PipeConnector outputPipeConnector) {
-    super(taskID);
+    super(taskID, taskMeta);
     this.inputPendingQueue = inputPendingQueue;
     this.outputPipeConnector = outputPipeConnector;
     executeOnceInvokedTimes = 0;
@@ -119,13 +122,14 @@ public class PipeConnectorSubtask extends PipeSubtask {
       // stop current pipe task if failed to reconnect to the target system after MAX_RETRY_TIMES
       // times
       if (retry == MAX_RETRY_TIMES) {
-        LOGGER.error(
-            "Failed to reconnect to the target system after {} times, stopping current pipe task {}...",
-            MAX_RETRY_TIMES,
-            taskID);
+        final String errorMessage =
+            String.format(
+                "Failed to reconnect to the target system after %d times, stopping current pipe task %s...",
+                MAX_RETRY_TIMES, taskID);
+        LOGGER.error(errorMessage);
         lastFailedCause = throwable;
 
-        PipeAgent.runtime().report(this);
+        PipeAgent.runtime().report(taskMeta, new PipeRuntimeCriticalException(errorMessage));
 
         // although the pipe task will be stopped, we still don't release the last event here
         // because we need to keep it for the next retry. if user wants to restart the task,
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 6a76beb02b3..94bcb248911 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -40,10 +41,11 @@ public class PipeProcessorSubtask extends PipeSubtask {
 
   public PipeProcessorSubtask(
       String taskID,
+      PipeTaskMeta taskMeta,
       EventSupplier inputEventSupplier,
       PipeProcessor pipeProcessor,
       EventCollector outputEventCollector) {
-    super(taskID);
+    super(taskID, taskMeta);
     this.inputEventSupplier = inputEventSupplier;
     this.pipeProcessor = pipeProcessor;
     this.outputEventCollector = outputEventCollector;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index 5ddff7b2f29..85fb197bbef 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -19,10 +19,12 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -42,6 +44,7 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtask.class);
 
   protected final String taskID;
+  protected final PipeTaskMeta taskMeta;
 
   private ListeningExecutorService subtaskWorkerThreadPoolExecutor;
   private ExecutorService subtaskCallbackListeningExecutor;
@@ -56,9 +59,10 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
 
   protected Event lastEvent;
 
-  public PipeSubtask(String taskID) {
+  protected PipeSubtask(String taskID, PipeTaskMeta taskMeta) {
     super();
     this.taskID = taskID;
+    this.taskMeta = taskMeta;
   }
 
   public void bindExecutors(
@@ -110,14 +114,14 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
       retryCount.incrementAndGet();
       submitSelf();
     } else {
-      LOGGER.warn(
-          "Subtask {} failed, has been retried for {} times, last failed because of {}",
-          taskID,
-          retryCount,
-          throwable);
+      final String errorMessage =
+          String.format(
+              "Subtask %s failed, has been retried for %d times, last failed because of %s",
+              taskID, retryCount.get(), throwable);
+      LOGGER.warn(errorMessage);
       lastFailedCause = throwable;
 
-      PipeAgent.runtime().report(this);
+      PipeAgent.runtime().report(taskMeta, new PipeRuntimeCriticalException(errorMessage));
 
       // although the pipe task will be stopped, we still don't release the last event here
       // because we need to keep it for the next retry. if user wants to restart the task,
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index db28843d7c7..e153f902fbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -836,7 +836,7 @@ public class DataNode implements DataNodeMBean {
   }
 
   private void preparePipeResources() throws StartupException {
-    PipeAgent.runtime().launchPipePluginAgent(resourcesInformationHolder);
+    PipeAgent.runtime().preparePipeResources(resourcesInformationHolder);
   }
 
   private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index fbb399831eb..02ef1150a84 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
-import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -103,16 +103,16 @@ public class PipeRealtimeCollectTest {
 
     try (PipeRealtimeDataRegionHybridCollector collector1 =
             new PipeRealtimeDataRegionHybridCollector(
-                null, new ListenableUnblockingPendingQueue<>());
+                null, new ListenableUnboundedBlockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector2 =
             new PipeRealtimeDataRegionHybridCollector(
-                null, new ListenableUnblockingPendingQueue<>());
+                null, new ListenableUnboundedBlockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector3 =
             new PipeRealtimeDataRegionHybridCollector(
-                null, new ListenableUnblockingPendingQueue<>());
+                null, new ListenableUnboundedBlockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector4 =
             new PipeRealtimeDataRegionHybridCollector(
-                null, new ListenableUnblockingPendingQueue<>())) {
+                null, new ListenableUnboundedBlockingPendingQueue<>())) {
 
       collector1.customize(
           new PipeParameters(
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
index 52acbe9c2ff..43d1f2eefb8 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
@@ -19,7 +19,8 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 
@@ -38,7 +39,8 @@ public class PipeConnectorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
         Mockito.spy(
             new PipeConnectorSubtask(
                 "PipeConnectorSubtaskExecutorTest",
-                mock(ListenableBlockingPendingQueue.class),
+                mock(PipeTaskMeta.class),
+                mock(ListenableBoundedBlockingPendingQueue.class),
                 mock(PipeConnector.class)));
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
index d0a5208d537..c20eab04cbd 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
@@ -39,6 +40,7 @@ public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
         Mockito.spy(
             new PipeProcessorSubtask(
                 "PipeProcessorSubtaskExecutorTest",
+                mock(PipeTaskMeta.class),
                 mock(EventSupplier.class),
                 mock(PipeProcessor.class),
                 mock(EventCollector.class)));