You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/31 18:44:51 UTC

[iotdb] branch master updated: [IOTDB-5956] Pipe: Fix bugs in IoTDBThriftReceiverV1, PipeMetaSync (#10007)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 81f541d22f2 [IOTDB-5956] Pipe: Fix bugs in IoTDBThriftReceiverV1, PipeMetaSync (#10007)
81f541d22f2 is described below

commit 81f541d22f2b4e642a8a330643709a067173cc32
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Thu Jun 1 02:44:44 2023 +0800

    [IOTDB-5956] Pipe: Fix bugs in IoTDBThriftReceiverV1, PipeMetaSync (#10007)
    
    * Fix bugs of pipeReceiveFileDir
    
    * Fix bugs of report exception to wrong PipeTaskMeta in ConnectorSubtask
---
 .../response/pipe/task/PipeTableResp.java          |  4 ++
 .../runtime/PipeHandleMetaChangeProcedure.java     | 48 +++++++++++--
 .../PipeRuntimeConnectorCriticalException.java     | 73 +++++++++++++++++++
 .../pipe}/PipeRuntimeCriticalException.java        | 36 +++++++++-
 .../exception/pipe}/PipeRuntimeException.java      | 11 ++-
 .../exception/pipe/PipeRuntimeExceptionType.java   | 82 ++++++++++++++++++++++
 .../pipe}/PipeRuntimeNonCriticalException.java     | 36 +++++++++-
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 35 ++++-----
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  1 +
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |  2 +-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  4 +-
 .../PipeRealtimeDataRegionHybridCollector.java     |  2 +-
 .../PipeRealtimeDataRegionLogCollector.java        |  2 +-
 .../PipeRealtimeDataRegionTsFileCollector.java     |  2 +-
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      | 10 ++-
 .../manager/PipeConnectorSubtaskManager.java       |  7 +-
 .../iotdb/db/pipe/core/event/EnrichedEvent.java    |  6 ++
 .../event/realtime/PipeRealtimeCollectEvent.java   | 18 ++++-
 .../db/pipe/resource/wal/PipeWALResource.java      |  4 +-
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |  3 +-
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  6 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  4 --
 .../db/pipe/task/subtask/PipeConnectorSubtask.java | 19 ++---
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |  4 +-
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    | 23 +++---
 .../executor/PipeConnectorSubtaskExecutorTest.java |  2 -
 .../executor/PipeProcessorSubtaskExecutorTest.java |  2 -
 27 files changed, 364 insertions(+), 82 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 2d6f401eefb..3153739629f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -44,6 +44,10 @@ public class PipeTableResp implements DataSet {
     this.allPipeMeta = allPipeMeta;
   }
 
+  public List<PipeMeta> getAllPipeMeta() {
+    return allPipeMeta;
+  }
+
   public PipeTableResp filter(Boolean whereClause, String pipeName) {
     if (whereClause == null) {
       if (pipeName == null) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index b63e157f096..8f9f9207ebc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -20,19 +20,21 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
+import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.pipe.api.exception.PipeManagementException;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -148,10 +150,46 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV
         for (final PipeRuntimeException exception :
             runtimeMetaFromDataNode.getExceptionMessages()) {
           pipeTaskMetaOnConfigNode.trackExceptionMessage(exception);
+
           if (exception instanceof PipeRuntimeCriticalException) {
-            pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
-            needWriteConsensusOnConfigNodes = true;
-            needPushPipeMetaToDataNodes = true;
+            final String pipeName = pipeMetaOnConfigNode.getStaticMeta().getPipeName();
+            if (!pipeMetaOnConfigNode
+                .getRuntimeMeta()
+                .getStatus()
+                .get()
+                .equals(PipeStatus.STOPPED)) {
+              pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+              needWriteConsensusOnConfigNodes = true;
+              needPushPipeMetaToDataNodes = true;
+
+              LOGGER.warn(
+                  String.format(
+                      "Detect PipeRuntimeCriticalException %s from DataNode, stop pipe %s.",
+                      exception, pipeName));
+            }
+
+            if (exception instanceof PipeRuntimeConnectorCriticalException) {
+              ((PipeTableResp)
+                      env.getConfigManager()
+                          .getPipeManager()
+                          .getPipeTaskCoordinator()
+                          .getPipeTaskInfo()
+                          .showPipes())
+                  .filter(true, pipeName).getAllPipeMeta().stream()
+                      .map(pipeMeta -> pipeMeta.getRuntimeMeta().getStatus())
+                      .filter(status -> !status.get().equals(PipeStatus.STOPPED))
+                      .forEach(
+                          status -> {
+                            status.set(PipeStatus.STOPPED);
+                            needWriteConsensusOnConfigNodes = true;
+                            needPushPipeMetaToDataNodes = true;
+
+                            LOGGER.warn(
+                                String.format(
+                                    "Detect PipeRuntimeConnectorCriticalException %s from DataNode, stop pipe %s.",
+                                    exception, pipeName));
+                          });
+            }
           }
         }
       }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
new file mode 100644
index 00000000000..615f3725457
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeConnectorCriticalException.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.exception.pipe;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class PipeRuntimeConnectorCriticalException extends PipeRuntimeCriticalException {
+  public PipeRuntimeConnectorCriticalException(String message) {
+    super(message);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return obj instanceof PipeRuntimeConnectorCriticalException
+        && Objects.equals(getMessage(), ((PipeRuntimeConnectorCriticalException) obj).getMessage());
+  }
+
+  @Override
+  public int hashCode() {
+    return getMessage().hashCode();
+  }
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    PipeRuntimeExceptionType.CONNECTOR_CRITICAL_EXCEPTION.serialize(byteBuffer);
+    ReadWriteIOUtils.write(getMessage(), byteBuffer);
+  }
+
+  @Override
+  public void serialize(OutputStream stream) throws IOException {
+    PipeRuntimeExceptionType.CONNECTOR_CRITICAL_EXCEPTION.serialize(stream);
+    ReadWriteIOUtils.write(getMessage(), stream);
+  }
+
+  public static PipeRuntimeConnectorCriticalException deserializeFrom(ByteBuffer byteBuffer) {
+    final String message = ReadWriteIOUtils.readString(byteBuffer);
+    return new PipeRuntimeConnectorCriticalException(message);
+  }
+
+  public static PipeRuntimeConnectorCriticalException deserializeFrom(InputStream stream)
+      throws IOException {
+    final String message = ReadWriteIOUtils.readString(stream);
+    return new PipeRuntimeConnectorCriticalException(message);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeRuntimeConnectorCriticalException{ message: " + getMessage() + " }";
+  }
+}
diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeCriticalException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
similarity index 52%
rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeCriticalException.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
index fbe73118385..b6bd3c11a1c 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeCriticalException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeCriticalException.java
@@ -17,8 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.exception;
+package org.apache.iotdb.commons.exception.pipe;
 
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.Objects;
 
 public class PipeRuntimeCriticalException extends PipeRuntimeException {
@@ -37,4 +43,32 @@ public class PipeRuntimeCriticalException extends PipeRuntimeException {
   public int hashCode() {
     return getMessage().hashCode();
   }
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    PipeRuntimeExceptionType.CRITICAL_EXCEPTION.serialize(byteBuffer);
+    ReadWriteIOUtils.write(getMessage(), byteBuffer);
+  }
+
+  @Override
+  public void serialize(OutputStream stream) throws IOException {
+    PipeRuntimeExceptionType.CRITICAL_EXCEPTION.serialize(stream);
+    ReadWriteIOUtils.write(getMessage(), stream);
+  }
+
+  public static PipeRuntimeCriticalException deserializeFrom(ByteBuffer byteBuffer) {
+    final String message = ReadWriteIOUtils.readString(byteBuffer);
+    return new PipeRuntimeCriticalException(message);
+  }
+
+  public static PipeRuntimeCriticalException deserializeFrom(InputStream stream)
+      throws IOException {
+    final String message = ReadWriteIOUtils.readString(stream);
+    return new PipeRuntimeCriticalException(message);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeRuntimeCriticalException{ message: " + getMessage() + " }";
+  }
 }
diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
similarity index 79%
rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeException.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
index 9b20dff7bff..9dd4dec5bac 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeException.java
@@ -17,8 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.exception;
+package org.apache.iotdb.commons.exception.pipe;
 
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.Objects;
 
 public abstract class PipeRuntimeException extends PipeException {
@@ -37,4 +42,8 @@ public abstract class PipeRuntimeException extends PipeException {
   public int hashCode() {
     return Objects.hash(getMessage());
   }
+
+  public abstract void serialize(ByteBuffer byteBuffer);
+
+  public abstract void serialize(OutputStream stream) throws IOException;
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java
new file mode 100644
index 00000000000..17d6ec37827
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeExceptionType.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.exception.pipe;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public enum PipeRuntimeExceptionType {
+  NON_CRITICAL_EXCEPTION((short) 1),
+  CRITICAL_EXCEPTION((short) 2),
+  CONNECTOR_CRITICAL_EXCEPTION((short) 3),
+  ;
+
+  private final short type;
+
+  PipeRuntimeExceptionType(short type) {
+    this.type = type;
+  }
+
+  public short getType() {
+    return type;
+  }
+
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(type, byteBuffer);
+  }
+
+  public void serialize(OutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(type, stream);
+  }
+
+  public static PipeRuntimeException deserializeFrom(ByteBuffer byteBuffer) {
+    final short type = ReadWriteIOUtils.readShort(byteBuffer);
+    switch (type) {
+      case 1:
+        return PipeRuntimeNonCriticalException.deserializeFrom(byteBuffer);
+      case 2:
+        return PipeRuntimeCriticalException.deserializeFrom(byteBuffer);
+      case 3:
+        return PipeRuntimeConnectorCriticalException.deserializeFrom(byteBuffer);
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Unsupported PipeRuntimeException type %s.", type));
+    }
+  }
+
+  public static PipeRuntimeException deserializeFrom(InputStream stream) throws IOException {
+    final short type = ReadWriteIOUtils.readShort(stream);
+    switch (type) {
+      case 1:
+        return PipeRuntimeNonCriticalException.deserializeFrom(stream);
+      case 2:
+        return PipeRuntimeCriticalException.deserializeFrom(stream);
+      case 3:
+        return PipeRuntimeConnectorCriticalException.deserializeFrom(stream);
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Unsupported PipeRuntimeException type %s.", type));
+    }
+  }
+}
diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeNonCriticalException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
similarity index 51%
rename from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeNonCriticalException.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
index 6fa63b93cc1..4bc7345924f 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeRuntimeNonCriticalException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeNonCriticalException.java
@@ -17,8 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.api.exception;
+package org.apache.iotdb.commons.exception.pipe;
 
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.Objects;
 
 public class PipeRuntimeNonCriticalException extends PipeRuntimeException {
@@ -37,4 +43,32 @@ public class PipeRuntimeNonCriticalException extends PipeRuntimeException {
   public int hashCode() {
     return getMessage().hashCode();
   }
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    PipeRuntimeExceptionType.NON_CRITICAL_EXCEPTION.serialize(byteBuffer);
+    ReadWriteIOUtils.write(getMessage(), byteBuffer);
+  }
+
+  @Override
+  public void serialize(OutputStream stream) throws IOException {
+    PipeRuntimeExceptionType.NON_CRITICAL_EXCEPTION.serialize(stream);
+    ReadWriteIOUtils.write(getMessage(), stream);
+  }
+
+  public static PipeRuntimeNonCriticalException deserializeFrom(ByteBuffer byteBuffer) {
+    final String message = ReadWriteIOUtils.readString(byteBuffer);
+    return new PipeRuntimeNonCriticalException(message);
+  }
+
+  public static PipeRuntimeNonCriticalException deserializeFrom(InputStream stream)
+      throws IOException {
+    final String message = ReadWriteIOUtils.readString(stream);
+    return new PipeRuntimeNonCriticalException(message);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeRuntimeNonCriticalException{ message: " + getMessage() + " }";
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index d4f43d15475..46547704d08 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -21,9 +21,8 @@ package org.apache.iotdb.commons.pipe.task.meta;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeExceptionType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
@@ -81,10 +80,8 @@ public class PipeTaskMeta {
     progressIndex.get().serialize(outputStream);
     ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream);
     ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
-    for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
-      ReadWriteIOUtils.write(
-          exceptionMessage instanceof PipeRuntimeCriticalException, outputStream);
-      ReadWriteIOUtils.write(exceptionMessage.getMessage(), outputStream);
+    for (final PipeRuntimeException pipeRuntimeException : exceptionMessages) {
+      pipeRuntimeException.serialize(outputStream);
     }
   }
 
@@ -92,10 +89,8 @@ public class PipeTaskMeta {
     progressIndex.get().serialize(outputStream);
     ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream);
     ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
-    for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
-      ReadWriteIOUtils.write(
-          exceptionMessage instanceof PipeRuntimeCriticalException, outputStream);
-      ReadWriteIOUtils.write(exceptionMessage.getMessage(), outputStream);
+    for (final PipeRuntimeException pipeRuntimeException : exceptionMessages) {
+      pipeRuntimeException.serialize(outputStream);
     }
   }
 
@@ -105,12 +100,9 @@ public class PipeTaskMeta {
     final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta(progressIndex, leaderDataNodeId);
     final int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
-      final boolean critical = ReadWriteIOUtils.readBool(byteBuffer);
-      final String message = ReadWriteIOUtils.readString(byteBuffer);
-      PipeTaskMeta.exceptionMessages.add(
-          critical
-              ? new PipeRuntimeCriticalException(message)
-              : new PipeRuntimeNonCriticalException(message));
+      final PipeRuntimeException pipeRuntimeException =
+          PipeRuntimeExceptionType.deserializeFrom(byteBuffer);
+      PipeTaskMeta.exceptionMessages.add(pipeRuntimeException);
     }
     return PipeTaskMeta;
   }
@@ -121,12 +113,9 @@ public class PipeTaskMeta {
     final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta(progressIndex, leaderDataNodeId);
     final int size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
-      final boolean critical = ReadWriteIOUtils.readBool(inputStream);
-      final String message = ReadWriteIOUtils.readString(inputStream);
-      PipeTaskMeta.exceptionMessages.add(
-          critical
-              ? new PipeRuntimeCriticalException(message)
-              : new PipeRuntimeNonCriticalException(message));
+      final PipeRuntimeException pipeRuntimeException =
+          PipeRuntimeExceptionType.deserializeFrom(inputStream);
+      PipeTaskMeta.exceptionMessages.add(pipeRuntimeException);
     }
     return PipeTaskMeta;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b4ae69a80cf..cdd894b5d47 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1261,6 +1261,7 @@ public class IoTDBConfig {
     triggerTemporaryLibDir = addDataHomeDir(triggerTemporaryLibDir);
     pipeDir = addDataHomeDir(pipeDir);
     pipeTemporaryLibDir = addDataHomeDir(pipeTemporaryLibDir);
+    pipeReceiveFileDir = addDataHomeDir(pipeReceiveFileDir);
     mqttDir = addDataHomeDir(mqttDir);
     extPipeDir = addDataHomeDir(extPipeDir);
     queryDir = addDataHomeDir(queryDir);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 884cecf4a74..b4e4077ea5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.agent.runtime;
 
 import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.service.IService;
@@ -28,7 +29,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.resource.file.PipeHardlinkFileDirStartupCleaner;
 import org.apache.iotdb.db.service.ResourcesInformationHolder;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index d0596127340..d086e6e71ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -518,9 +518,7 @@ public class PipeTaskAgent {
         .getPipeMeta(pipeStaticMeta.getPipeName())
         .getRuntimeMeta()
         .getConsensusGroupIdToTaskMetaMap()
-        .put(
-            consensusGroupId,
-            new PipeTaskMeta(pipeTaskMeta.getProgressIndex(), pipeTaskMeta.getLeaderDataNodeId()));
+        .put(consensusGroupId, pipeTaskMeta);
   }
 
   private void dropPipeTask(TConsensusGroupId dataRegionGroupId, PipeStaticMeta pipeStaticMeta) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 493240455aa..cf01a1ea15a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.collector.realtime;
 
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
@@ -28,7 +29,6 @@ import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
index e6835187cfc..3263e52c1e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.collector.realtime;
 
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -26,7 +27,6 @@ import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
 import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
index 1b00fdd974e..7b287b10545 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.collector.realtime;
 
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -26,7 +27,6 @@ import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
 import org.apache.iotdb.db.pipe.task.queue.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index d320366552e..6f7db5bbf97 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -126,7 +126,10 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
       LOGGER.error(
           "Network error when transfer tablet insertion event: {}.", tabletInsertionEvent, e);
       // the connection may be broken, try to reconnect by catching PipeConnectionException
-      throw new PipeConnectionException("Network error when transfer tablet insertion event.", e);
+      throw new PipeConnectionException(
+          String.format(
+              "Network error when transfer tablet insertion event, because %s.", e.getMessage()),
+          e);
     }
   }
 
@@ -159,7 +162,10 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
       LOGGER.error(
           "Network error when transfer tsfile insertion event: {}.", tsFileInsertionEvent, e);
       // the connection may be broken, try to reconnect by catching PipeConnectionException
-      throw new PipeConnectionException("Network error when transfer tsfile insertion event.", e);
+      throw new PipeConnectionException(
+          String.format(
+              "Network error when transfer tsfile insertion event, because %s.", e.getMessage()),
+          e);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
index 94b2975358c..548a48fdc0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.core.connector.manager;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
 import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.IoTDBThriftConnectorV1;
@@ -46,9 +45,7 @@ public class PipeConnectorSubtaskManager {
       attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
 
   public synchronized String register(
-      PipeConnectorSubtaskExecutor executor,
-      PipeParameters pipeConnectorParameters,
-      PipeTaskMeta taskMeta) {
+      PipeConnectorSubtaskExecutor executor, PipeParameters pipeConnectorParameters) {
     final String attributeSortedString =
         new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
 
@@ -81,7 +78,7 @@ public class PipeConnectorSubtaskManager {
           new BoundedBlockingPendingQueue<>(
               PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
       final PipeConnectorSubtask pipeConnectorSubtask =
-          new PipeConnectorSubtask(attributeSortedString, taskMeta, pendingQueue, pipeConnector);
+          new PipeConnectorSubtask(attributeSortedString, pendingQueue, pipeConnector);
       final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
           new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, pendingQueue);
       attributeSortedString2SubtaskLifeCycleMap.put(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
index 28e611bd301..9fcc5fd1d90 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
@@ -20,7 +20,9 @@
 package org.apache.iotdb.db.pipe.core.event;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.concurrent.atomic.AtomicInteger;
@@ -114,4 +116,8 @@ public abstract class EnrichedEvent implements Event {
 
   public abstract EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       PipeTaskMeta pipeTaskMeta);
+
+  public void reportException(PipeRuntimeException pipeRuntimeException) {
+    PipeAgent.runtime().report(this.pipeTaskMeta, pipeRuntimeException);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index 15479bc3788..f15ef605617 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -49,6 +49,21 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent {
     this.device2Measurements = device2Measurements;
   }
 
+  public PipeRealtimeCollectEvent(
+      EnrichedEvent event,
+      TsFileEpoch tsFileEpoch,
+      Map<String, String[]> device2Measurements,
+      PipeTaskMeta pipeTaskMeta) {
+    // pipeTaskMeta is used to report the progress of the event, the PipeRealtimeCollectEvent
+    // is only used in the realtime event collector, which does not need to report the progress
+    // of the event, so the pipeTaskMeta is always null.
+    super(pipeTaskMeta);
+
+    this.event = event;
+    this.tsFileEpoch = tsFileEpoch;
+    this.device2Measurements = device2Measurements;
+  }
+
   public Event getEvent() {
     return event;
   }
@@ -104,7 +119,8 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent {
     return new PipeRealtimeCollectEvent(
         event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta),
         this.tsFileEpoch,
-        this.device2Measurements);
+        this.device2Measurements,
+        pipeTaskMeta);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 9ccb946ae45..b4a01a7828b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -19,10 +19,10 @@
 
 package org.apache.iotdb.db.pipe.resource.wal;
 
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.db.wal.exception.MemTablePinException;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 1886f489c41..715816cfbcc 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -50,14 +50,13 @@ public class PipeTaskBuilder {
             pipeStaticMeta.getCreationTime(),
             pipeStaticMeta.getCollectorParameters());
     final PipeTaskConnectorStage connectorStage =
-        new PipeTaskConnectorStage(pipeStaticMeta.getConnectorParameters(), pipeTaskMeta);
+        new PipeTaskConnectorStage(pipeStaticMeta.getConnectorParameters());
 
     // the processor connects the collector and connector.
     final PipeTaskProcessorStage processorStage =
         new PipeTaskProcessorStage(
             pipeStaticMeta.getPipeName(),
             dataRegionId,
-            pipeTaskMeta,
             collectorStage.getEventSupplier(),
             collectorStage.getCollectorPendingQueue(),
             pipeStaticMeta.getProcessorParameters(),
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 5e43b1a2d18..5615c36f560 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.core.connector.manager.PipeConnectorSubtaskManager;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
@@ -33,14 +32,13 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
 
   protected String connectorSubtaskId;
 
-  public PipeTaskConnectorStage(PipeParameters pipeConnectorParameters, PipeTaskMeta taskMeta) {
+  public PipeTaskConnectorStage(PipeParameters pipeConnectorParameters) {
     this.pipeConnectorParameters = pipeConnectorParameters;
     connectorSubtaskId =
         PipeConnectorSubtaskManager.instance()
             .register(
                 PipeSubtaskExecutorManager.getInstance().getConnectorSubtaskExecutor(),
-                pipeConnectorParameters,
-                taskMeta);
+                pipeConnectorParameters);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index c0bad32a3f0..7af0df4ecff 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.task.stage;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
 import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
@@ -53,7 +52,6 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
   /**
    * @param pipeName pipe name
    * @param dataRegionId data region id
-   * @param taskMeta pipe task meta
    * @param pipeCollectorInputEventSupplier used to input events from pipe collector
    * @param pipeCollectorInputPendingQueue used to listen whether pipe collector event queue is from
    *     empty to not empty or from not empty to empty, null means no need to listen
@@ -63,7 +61,6 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
   public PipeTaskProcessorStage(
       String pipeName,
       TConsensusGroupId dataRegionId,
-      PipeTaskMeta taskMeta,
       EventSupplier pipeCollectorInputEventSupplier,
       @Nullable BlockingPendingQueue<Event> pipeCollectorInputPendingQueue,
       PipeParameters pipeProcessorParameters,
@@ -78,7 +75,6 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     this.pipeProcessorSubtask =
         new PipeProcessorSubtask(
             taskId,
-            taskMeta,
             pipeCollectorInputEventSupplier,
             pipeProcessor,
             pipeConnectorOutputEventCollector);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 12802d88e5f..f34b2b3f961 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -29,7 +29,6 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
 
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
@@ -48,10 +47,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
   /** @param taskID connectorAttributeSortedString */
   public PipeConnectorSubtask(
       String taskID,
-      PipeTaskMeta taskMeta,
       BoundedBlockingPendingQueue<Event> inputPendingQueue,
       PipeConnector outputPipeConnector) {
-    super(taskID, taskMeta);
+    super(taskID);
     this.inputPendingQueue = inputPendingQueue;
     this.outputPipeConnector = outputPipeConnector;
     executeOnceInvokedTimes = 0;
@@ -88,7 +86,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
     } catch (PipeConnectionException e) {
       throw e;
     } catch (Exception e) {
-      e.printStackTrace();
+      LOGGER.warn("Execute Connector subtask once error.", e);
       throw new PipeException(
           "Error occurred during executing PipeConnector#transfer, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.",
           e);
@@ -126,10 +124,13 @@ public class PipeConnectorSubtask extends PipeSubtask {
             String.format(
                 "Failed to reconnect to the target system after %d times, stopping current pipe task %s...",
                 MAX_RETRY_TIMES, taskID);
-        LOGGER.warn(errorMessage);
+        LOGGER.warn(errorMessage, throwable);
         lastFailedCause = throwable;
 
-        PipeAgent.runtime().report(taskMeta, new PipeRuntimeCriticalException(errorMessage));
+        if (lastEvent instanceof EnrichedEvent) {
+          ((EnrichedEvent) lastEvent)
+              .reportException(new PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+        }
 
         // although the pipe task will be stopped, we still don't release the last event here
         // because we need to keep it for the next retry. if user wants to restart the task,
@@ -140,7 +141,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
     }
 
     // handle other exceptions as usual
-    super.onFailure(throwable);
+    super.onFailure(new PipeRuntimeConnectorCriticalException(throwable.getMessage()));
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 94bcb248911..6a76beb02b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -41,11 +40,10 @@ public class PipeProcessorSubtask extends PipeSubtask {
 
   public PipeProcessorSubtask(
       String taskID,
-      PipeTaskMeta taskMeta,
       EventSupplier inputEventSupplier,
       PipeProcessor pipeProcessor,
       EventCollector outputEventCollector) {
-    super(taskID, taskMeta);
+    super(taskID);
     this.inputEventSupplier = inputEventSupplier;
     this.pipeProcessor = pipeProcessor;
     this.outputEventCollector = outputEventCollector;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index b1331f4df48..524e9abbcfa 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -19,12 +19,11 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -44,7 +43,6 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtask.class);
 
   protected final String taskID;
-  protected final PipeTaskMeta taskMeta;
 
   private ListeningExecutorService subtaskWorkerThreadPoolExecutor;
   private ExecutorService subtaskCallbackListeningExecutor;
@@ -59,10 +57,9 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
 
   protected Event lastEvent;
 
-  protected PipeSubtask(String taskID, PipeTaskMeta taskMeta) {
+  protected PipeSubtask(String taskID) {
     super();
     this.taskID = taskID;
-    this.taskMeta = taskMeta;
   }
 
   public void bindExecutors(
@@ -112,16 +109,26 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
   public void onFailure(@NotNull Throwable throwable) {
     if (retryCount.get() < MAX_RETRY_TIMES) {
       retryCount.incrementAndGet();
+      LOGGER.warn(
+          String.format(
+              "Retry subtask %s, retry count [%s/%s]",
+              this.getClass().getSimpleName(), retryCount.get(), MAX_RETRY_TIMES));
       submitSelf();
     } else {
       final String errorMessage =
           String.format(
               "Subtask %s failed, has been retried for %d times, last failed because of %s",
               taskID, retryCount.get(), throwable);
-      LOGGER.warn(errorMessage);
+      LOGGER.warn(errorMessage, throwable);
       lastFailedCause = throwable;
 
-      PipeAgent.runtime().report(taskMeta, new PipeRuntimeCriticalException(errorMessage));
+      if (lastEvent instanceof EnrichedEvent) {
+        ((EnrichedEvent) lastEvent)
+            .reportException(
+                throwable instanceof PipeRuntimeException
+                    ? (PipeRuntimeException) throwable
+                    : new PipeRuntimeCriticalException(errorMessage));
+      }
 
       // although the pipe task will be stopped, we still don't release the last event here
       // because we need to keep it for the next retry. if user wants to restart the task,
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
index c01d6f51b39..802afcf16d9 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.task.queue.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -39,7 +38,6 @@ public class PipeConnectorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
         Mockito.spy(
             new PipeConnectorSubtask(
                 "PipeConnectorSubtaskExecutorTest",
-                mock(PipeTaskMeta.class),
                 mock(BoundedBlockingPendingQueue.class),
                 mock(PipeConnector.class)));
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
index c20eab04cbd..d0a5208d537 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
@@ -40,7 +39,6 @@ public class PipeProcessorSubtaskExecutorTest extends PipeSubtaskExecutorTest {
         Mockito.spy(
             new PipeProcessorSubtask(
                 "PipeProcessorSubtaskExecutorTest",
-                mock(PipeTaskMeta.class),
                 mock(EventSupplier.class),
                 mock(PipeProcessor.class),
                 mock(EventCollector.class)));