You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2024/04/25 12:32:38 UTC

(iotdb) branch master updated: Pipe: Reduce exception messages to avoid excess rpc payload and show pipe response (#12415)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 98b2272dbc1 Pipe: Reduce exception messages to avoid excess rpc payload and show pipe response (#12415)
98b2272dbc1 is described below

commit 98b2272dbc180cfb64b56ea33931dcc2d09ec0a4
Author: Caideyipi <87...@users.noreply.github.com>
AuthorDate: Thu Apr 25 20:32:31 2024 +0800

    Pipe: Reduce exception messages to avoid excess rpc payload and show pipe response (#12415)
---
 .../protocol/IoTDBConfigRegionAirGapConnector.java |  5 ++-
 .../pipe/event/PipeConfigRegionSnapshotEvent.java  | 51 +++++++++++++---------
 .../pipe/event/PipeConfigRegionWritePlanEvent.java | 46 +++++++++++++------
 .../airgap/IoTDBDataRegionAirGapConnector.java     |  9 ++--
 .../airgap/IoTDBSchemaRegionAirGapConnector.java   |  5 ++-
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  |  3 +-
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  | 10 +++--
 .../schema/PipeSchemaRegionSnapshotEvent.java      |  9 ++++
 .../schema/PipeSchemaRegionWritePlanEvent.java     | 44 +++++++++++++------
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  9 ++++
 .../subtask/processor/PipeProcessorSubtask.java    | 38 ++++++++--------
 .../commons/pipe/event/PipeSnapshotEvent.java      | 19 +++++---
 .../commons/pipe/event/PipeWritePlanEvent.java     | 35 +++++++++++----
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 28 +++++++-----
 14 files changed, 211 insertions(+), 100 deletions(-)

diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index bd775cbe635..3235043fb45 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBAirGapConnector;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigNodeHandshakeV1Req;
 import org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigNodeHandshakeV2Req;
 import org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigPlanReq;
@@ -121,7 +122,9 @@ public class IoTDBConfigRegionAirGapConnector extends IoTDBAirGapConnector {
       isSocketAlive.set(socketIndex, false);
 
       throw new PipeConnectionException(
-          String.format("Network error when transfer event %s, because %s.", event, e.getMessage()),
+          String.format(
+              "Network error when transfer event %s, because %s.",
+              ((EnrichedEvent) event).coreReportMessage(), e.getMessage()),
           e);
     }
   }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
index e5e759d5100..f4c5ad1bd97 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
@@ -88,17 +88,17 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
   }
 
   public PipeConfigRegionSnapshotEvent(
-      String snapshotPath, String templateFilePath, CNSnapshotFileType type) {
+      final String snapshotPath, final String templateFilePath, final CNSnapshotFileType type) {
     this(snapshotPath, templateFilePath, type, null, null, null);
   }
 
   public PipeConfigRegionSnapshotEvent(
-      String snapshotPath,
-      String templateFilePath,
-      CNSnapshotFileType type,
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern) {
+      final String snapshotPath,
+      final String templateFilePath,
+      final CNSnapshotFileType type,
+      final String pipeName,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern) {
     super(pipeName, pipeTaskMeta, pattern, PipeConfigNodeSnapshotResourceManager.getInstance());
     this.snapshotPath = snapshotPath;
     this.templateFilePath = Objects.nonNull(templateFilePath) ? templateFilePath : "";
@@ -118,14 +118,14 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
   }
 
   @Override
-  public boolean internallyIncreaseResourceReferenceCount(String holderMessage) {
+  public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
     try {
       snapshotPath = resourceManager.increaseSnapshotReference(snapshotPath);
       if (!templateFilePath.isEmpty()) {
         templateFilePath = resourceManager.increaseSnapshotReference(templateFilePath);
       }
       return true;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           String.format(
               "Increase reference count for snapshot %s error. Holder Message: %s",
@@ -136,14 +136,14 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
   }
 
   @Override
-  public boolean internallyDecreaseResourceReferenceCount(String holderMessage) {
+  public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
     try {
       resourceManager.decreaseSnapshotReference(snapshotPath);
       if (!templateFilePath.isEmpty()) {
         resourceManager.decreaseSnapshotReference(templateFilePath);
       }
       return true;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           String.format(
               "Decrease reference count for snapshot %s error. Holder Message: %s",
@@ -155,18 +155,18 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      long startTime,
-      long endTime) {
+      final String pipeName,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime) {
     return new PipeConfigRegionSnapshotEvent(
         snapshotPath, templateFilePath, fileType, pipeName, pipeTaskMeta, pattern);
   }
 
   @Override
   public ByteBuffer serializeToByteBuffer() {
-    ByteBuffer result =
+    final ByteBuffer result =
         ByteBuffer.allocate(
             2 * Byte.BYTES
                 + 2 * Integer.BYTES
@@ -180,7 +180,7 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
   }
 
   @Override
-  public void deserializeFromByteBuffer(ByteBuffer buffer) {
+  public void deserializeFromByteBuffer(final ByteBuffer buffer) {
     fileType = CNSnapshotFileType.deserialize(ReadWriteIOUtils.readByte(buffer));
     snapshotPath = ReadWriteIOUtils.readString(buffer);
     templateFilePath = ReadWriteIOUtils.readString(buffer);
@@ -188,7 +188,7 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
 
   /////////////////////////////// Type parsing ///////////////////////////////
 
-  public static boolean needTransferSnapshot(Set<ConfigPhysicalPlanType> listenedTypeSet) {
+  public static boolean needTransferSnapshot(final Set<ConfigPhysicalPlanType> listenedTypeSet) {
     final Set<Short> types =
         SNAPSHOT_FILE_TYPE_2_CONFIG_PHYSICAL_PLAN_TYPE_MAP.values().stream()
             .flatMap(Collection::stream)
@@ -200,7 +200,7 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
     return !types.isEmpty();
   }
 
-  public void confineTransferredTypes(Set<ConfigPhysicalPlanType> listenedTypeSet) {
+  public void confineTransferredTypes(final Set<ConfigPhysicalPlanType> listenedTypeSet) {
     final Set<Short> types =
         new HashSet<>(SNAPSHOT_FILE_TYPE_2_CONFIG_PHYSICAL_PLAN_TYPE_MAP.get(fileType));
     types.retainAll(
@@ -210,7 +210,7 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
     transferredTypes = types;
   }
 
-  public static Set<ConfigPhysicalPlanType> getConfigPhysicalPlanTypeSet(String sealTypes) {
+  public static Set<ConfigPhysicalPlanType> getConfigPhysicalPlanTypeSet(final String sealTypes) {
     return sealTypes.isEmpty()
         ? Collections.emptySet()
         : Arrays.stream(sealTypes.split(","))
@@ -231,4 +231,13 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
         + " - "
         + super.toString();
   }
+
+  @Override
+  public String coreReportMessage() {
+    return String.format(
+            "PipeConfigRegionSnapshotEvent{snapshotPath=%s, templateFilePath=%s, fileType=%s}",
+            snapshotPath, templateFilePath, fileType)
+        + " - "
+        + super.coreReportMessage();
+  }
 }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
index 52da0b27c6b..c6996907460 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
@@ -40,16 +40,16 @@ public class PipeConfigRegionWritePlanEvent extends PipeWritePlanEvent {
   }
 
   public PipeConfigRegionWritePlanEvent(
-      ConfigPhysicalPlan configPhysicalPlan, boolean isGeneratedByPipe) {
+      final ConfigPhysicalPlan configPhysicalPlan, final boolean isGeneratedByPipe) {
     this(configPhysicalPlan, null, null, null, isGeneratedByPipe);
   }
 
   public PipeConfigRegionWritePlanEvent(
-      ConfigPhysicalPlan configPhysicalPlan,
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      boolean isGeneratedByPipe) {
+      final ConfigPhysicalPlan configPhysicalPlan,
+      final String pipeName,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final boolean isGeneratedByPipe) {
     super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
     this.configPhysicalPlan = configPhysicalPlan;
   }
@@ -60,19 +60,19 @@ public class PipeConfigRegionWritePlanEvent extends PipeWritePlanEvent {
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      long startTime,
-      long endTime) {
+      final String pipeName,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime) {
     return new PipeConfigRegionWritePlanEvent(
         configPhysicalPlan, pipeName, pipeTaskMeta, pattern, false);
   }
 
   @Override
   public ByteBuffer serializeToByteBuffer() {
-    ByteBuffer planBuffer = configPhysicalPlan.serializeToByteBuffer();
-    ByteBuffer result = ByteBuffer.allocate(Byte.BYTES * 2 + planBuffer.limit());
+    final ByteBuffer planBuffer = configPhysicalPlan.serializeToByteBuffer();
+    final ByteBuffer result = ByteBuffer.allocate(Byte.BYTES * 2 + planBuffer.limit());
     ReadWriteIOUtils.write(PipeConfigSerializableEventType.CONFIG_WRITE_PLAN.getType(), result);
     ReadWriteIOUtils.write(isGeneratedByPipe, result);
     result.put(planBuffer);
@@ -80,8 +80,26 @@ public class PipeConfigRegionWritePlanEvent extends PipeWritePlanEvent {
   }
 
   @Override
-  public void deserializeFromByteBuffer(ByteBuffer buffer) throws IOException {
+  public void deserializeFromByteBuffer(final ByteBuffer buffer) throws IOException {
     isGeneratedByPipe = ReadWriteIOUtils.readBool(buffer);
     configPhysicalPlan = ConfigPhysicalPlan.Factory.create(buffer);
   }
+
+  /////////////////////////// Object ///////////////////////////
+
+  @Override
+  public String toString() {
+    return String.format(
+            "PipeConfigRegionWritePlanEvent{configPhysicalPlan=%s}", configPhysicalPlan)
+        + " - "
+        + super.toString();
+  }
+
+  @Override
+  public String coreReportMessage() {
+    return String.format(
+            "PipeConfigRegionWritePlanEvent{configPhysicalPlan=%s}", configPhysicalPlan)
+        + " - "
+        + super.coreReportMessage();
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 7a187d624e1..0548e4da48f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.connector.protocol.airgap;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -82,7 +83,7 @@ public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector
       throw new PipeConnectionException(
           String.format(
               "Network error when transfer tablet insertion event %s, because %s.",
-              tabletInsertionEvent, e.getMessage()),
+              ((EnrichedEvent) tabletInsertionEvent).coreReportMessage(), e.getMessage()),
           e);
     }
   }
@@ -115,7 +116,8 @@ public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector
       throw new PipeConnectionException(
           String.format(
               "Network error when transfer tsfile insertion event %s, because %s.",
-              tsFileInsertionEvent, e.getMessage()),
+              ((PipeTsFileInsertionEvent) tsFileInsertionEvent).coreReportMessage(),
+              e.getMessage()),
           e);
     }
   }
@@ -138,7 +140,8 @@ public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector
 
       throw new PipeConnectionException(
           String.format(
-              "Network error when transfer tsfile event %s, because %s.", event, e.getMessage()),
+              "Network error when transfer tsfile event %s, because %s.",
+              ((PipeSchemaRegionWritePlanEvent) event).coreReportMessage(), e.getMessage()),
           e);
     }
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
index c5755832ce8..74375cff112 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.connector.protocol.airgap;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -76,7 +77,9 @@ public class IoTDBSchemaRegionAirGapConnector extends IoTDBDataNodeAirGapConnect
       isSocketAlive.set(socketIndex, false);
 
       throw new PipeConnectionException(
-          String.format("Network error when transfer event %s, because %s.", event, e.getMessage()),
+          String.format(
+              "Network error when transfer event %s, because %s.",
+              ((EnrichedEvent) event).coreReportMessage(), e.getMessage()),
           e);
     }
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 3f16d3563c5..dd12caf55a1 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -293,7 +293,8 @@ public class IoTDBLegacyPipeConnector implements PipeConnector {
     } catch (final TException e) {
       throw new PipeConnectionException(
           String.format(
-              "Network error when transfer tsFile insertion event: %s.", tsFileInsertionEvent),
+              "Network error when transfer tsFile insertion event: %s.",
+              ((PipeTsFileInsertionEvent) tsFileInsertionEvent).coreReportMessage()),
           e);
     }
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 7c19f9f44eb..9919d954b30 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
 import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
@@ -112,7 +113,7 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
       throw new PipeConnectionException(
           String.format(
               "Failed to transfer tablet insertion event %s, because %s.",
-              tabletInsertionEvent, e.getMessage()),
+              ((EnrichedEvent) tabletInsertionEvent).coreReportMessage(), e.getMessage()),
           e);
     }
   }
@@ -138,7 +139,8 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
       throw new PipeConnectionException(
           String.format(
               "Failed to transfer tsfile insertion event %s, because %s.",
-              tsFileInsertionEvent, e.getMessage()),
+              ((PipeTsFileInsertionEvent) tsFileInsertionEvent).coreReportMessage(),
+              e.getMessage()),
           e);
     }
   }
@@ -248,7 +250,7 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
           status,
           String.format(
               "Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s",
-              pipeInsertNodeTabletInsertionEvent, status),
+              pipeInsertNodeTabletInsertionEvent.coreReportMessage(), status),
           pipeInsertNodeTabletInsertionEvent.toString());
     }
     if (insertNode != null && status.isSetRedirectNode()) {
@@ -303,7 +305,7 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
           status,
           String.format(
               "Transfer PipeRawTabletInsertionEvent %s error, result status %s",
-              pipeRawTabletInsertionEvent, status),
+              pipeRawTabletInsertionEvent.coreReportMessage(), status),
           pipeRawTabletInsertionEvent.toString());
     }
     if (status.isSetRedirectNode()) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index e984c0c6f09..1b51c7f5568 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -212,4 +212,13 @@ public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent {
         + " - "
         + super.toString();
   }
+
+  @Override
+  public String coreReportMessage() {
+    return String.format(
+            "PipeSchemaRegionSnapshotEvent{mTreeSnapshotPath=%s, tagLogSnapshotPath=%s, databaseName=%s}",
+            mTreeSnapshotPath, tagLogSnapshotPath, databaseName)
+        + " - "
+        + super.coreReportMessage();
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
index 6e944388e52..6e62fe80610 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
@@ -39,16 +39,16 @@ public class PipeSchemaRegionWritePlanEvent extends PipeWritePlanEvent {
     this(null, false);
   }
 
-  public PipeSchemaRegionWritePlanEvent(PlanNode planNode, boolean isGeneratedByPipe) {
+  public PipeSchemaRegionWritePlanEvent(final PlanNode planNode, final boolean isGeneratedByPipe) {
     this(planNode, null, null, null, isGeneratedByPipe);
   }
 
   public PipeSchemaRegionWritePlanEvent(
-      PlanNode planNode,
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      boolean isGeneratedByPipe) {
+      final PlanNode planNode,
+      final String pipeName,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final boolean isGeneratedByPipe) {
     super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
     this.planNode = planNode;
   }
@@ -59,19 +59,19 @@ public class PipeSchemaRegionWritePlanEvent extends PipeWritePlanEvent {
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      long startTime,
-      long endTime) {
+      final String pipeName,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime) {
     return new PipeSchemaRegionWritePlanEvent(
         planNode, pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
   }
 
   @Override
   public ByteBuffer serializeToByteBuffer() {
-    ByteBuffer planBuffer = planNode.serializeToByteBuffer();
-    ByteBuffer result = ByteBuffer.allocate(Byte.BYTES * 2 + planBuffer.limit());
+    final ByteBuffer planBuffer = planNode.serializeToByteBuffer();
+    final ByteBuffer result = ByteBuffer.allocate(Byte.BYTES * 2 + planBuffer.limit());
     ReadWriteIOUtils.write(PipeSchemaSerializableEventType.SCHEMA_WRITE_PLAN.getType(), result);
     ReadWriteIOUtils.write(isGeneratedByPipe, result);
     result.put(planBuffer);
@@ -79,8 +79,24 @@ public class PipeSchemaRegionWritePlanEvent extends PipeWritePlanEvent {
   }
 
   @Override
-  public void deserializeFromByteBuffer(ByteBuffer buffer) {
+  public void deserializeFromByteBuffer(final ByteBuffer buffer) {
     isGeneratedByPipe = ReadWriteIOUtils.readBool(buffer);
     planNode = PlanNodeType.deserialize(buffer);
   }
+
+  /////////////////////////// Object ///////////////////////////
+
+  @Override
+  public String toString() {
+    return String.format("PipeConfigRegionWritePlanEvent{planNode=%s}", planNode)
+        + " - "
+        + super.toString();
+  }
+
+  @Override
+  public String coreReportMessage() {
+    return String.format("PipeConfigRegionWritePlanEvent{planNode=%s}", planNode)
+        + " - "
+        + super.coreReportMessage();
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index c609eb907f5..3d951d0e394 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -367,4 +367,13 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
         + " - "
         + super.toString();
   }
+
+  @Override
+  public String coreReportMessage() {
+    return String.format(
+            "PipeTsFileInsertionEvent{resource=%s, tsFile=%s, isLoaded=%s, isGeneratedByPipe=%s, isClosed=%s}",
+            resource, tsFile, isLoaded, isGeneratedByPipe, isClosed.get())
+        + " - "
+        + super.coreReportMessage();
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 45568782948..3c47d3b873d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -65,13 +65,13 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {
   private final long subtaskCreationTime;
 
   public PipeProcessorSubtask(
-      String taskID,
-      long creationTime,
-      String pipeName,
-      int dataRegionId,
-      EventSupplier inputEventSupplier,
-      PipeProcessor pipeProcessor,
-      PipeEventCollector outputEventCollector) {
+      final String taskID,
+      final long creationTime,
+      final String pipeName,
+      final int dataRegionId,
+      final EventSupplier inputEventSupplier,
+      final PipeProcessor pipeProcessor,
+      final PipeEventCollector outputEventCollector) {
     super(taskID, creationTime);
     this.subtaskCreationTime = System.currentTimeMillis();
     this.pipeName = pipeName;
@@ -84,9 +84,9 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {
 
   @Override
   public void bindExecutors(
-      ListeningExecutorService subtaskWorkerThreadPoolExecutor,
-      ExecutorService ignored,
-      PipeSubtaskScheduler subtaskScheduler) {
+      final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
+      final ExecutorService ignored,
+      final PipeSubtaskScheduler subtaskScheduler) {
     this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
     this.subtaskScheduler = subtaskScheduler;
 
@@ -150,17 +150,21 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {
       }
       decreaseReferenceCountAndReleaseLastEvent(
           !isClosed.get() && outputEventCollector.hasNoCollectInvocationAfterReset());
-    } catch (PipeRuntimeOutOfMemoryCriticalException e) {
+    } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
       LOGGER.info(
           "Temporarily out of memory in pipe event processing, will wait for the memory to release.",
           e);
       return false;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       if (!isClosed.get()) {
         throw new PipeException(
             String.format(
                 "Exception in pipe process, subtask: %s, last event: %s, root cause: %s",
-                taskID, lastEvent, ErrorHandlingUtils.getRootCause(e).getMessage()),
+                taskID,
+                lastEvent instanceof EnrichedEvent
+                    ? ((EnrichedEvent) lastEvent).coreReportMessage()
+                    : lastEvent,
+                ErrorHandlingUtils.getRootCause(e).getMessage()),
             e);
       } else {
         LOGGER.info("Exception in pipe event processing, ignored because pipe is dropped.", e);
@@ -191,7 +195,7 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {
       // pipeProcessor closes first, then no more events will be added into outputEventCollector.
       // only after that, outputEventCollector can be closed.
       pipeProcessor.close();
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.info(
           "Exception occurred when closing pipe processor subtask {}, root cause: {}",
           taskID,
@@ -210,7 +214,7 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
@@ -252,12 +256,12 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {
   //////////////////////////// Error report ////////////////////////////
 
   @Override
-  protected String getRootCause(Throwable throwable) {
+  protected String getRootCause(final Throwable throwable) {
     return ErrorHandlingUtils.getRootCause(throwable).getMessage();
   }
 
   @Override
-  protected void report(EnrichedEvent event, PipeRuntimeException exception) {
+  protected void report(final EnrichedEvent event, final PipeRuntimeException exception) {
     PipeAgent.runtime().report(event, exception);
   }
 }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
index b7193ef4dbf..fa26319e6dc 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
@@ -34,16 +34,16 @@ public abstract class PipeSnapshotEvent extends EnrichedEvent implements Seriali
   protected Set<Short> transferredTypes;
 
   protected PipeSnapshotEvent(
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      PipeSnapshotResourceManager resourceManager) {
+      final String pipeName,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final PipeSnapshotResourceManager resourceManager) {
     super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
     this.resourceManager = resourceManager;
   }
 
   @Override
-  public void bindProgressIndex(ProgressIndex progressIndex) {
+  public void bindProgressIndex(final ProgressIndex progressIndex) {
     this.progressIndex = progressIndex;
   }
 
@@ -80,4 +80,13 @@ public abstract class PipeSnapshotEvent extends EnrichedEvent implements Seriali
         + " - "
         + super.toString();
   }
+
+  @Override
+  public String coreReportMessage() {
+    return String.format(
+            "PipeSnapshotEvent{progressIndex=%s, transferredTypes=%s}",
+            progressIndex, transferredTypes)
+        + " - "
+        + super.coreReportMessage();
+  }
 }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
index e31e9c85efc..2e9619d65fa 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
@@ -26,20 +26,19 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 public abstract class PipeWritePlanEvent extends EnrichedEvent implements SerializableEvent {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeWritePlanEvent.class);
 
   protected boolean isGeneratedByPipe;
 
-  protected final AtomicLong referenceCount = new AtomicLong(0);
-
   protected ProgressIndex progressIndex;
 
   protected PipeWritePlanEvent(
-      String pipeName, PipeTaskMeta pipeTaskMeta, PipePattern pattern, boolean isGeneratedByPipe) {
+      final String pipeName,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final boolean isGeneratedByPipe) {
     super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
     this.isGeneratedByPipe = isGeneratedByPipe;
   }
@@ -49,7 +48,7 @@ public abstract class PipeWritePlanEvent extends EnrichedEvent implements Serial
    * We just use a counter to prevent the reference count from being less than 0.
    */
   @Override
-  public boolean internallyIncreaseResourceReferenceCount(String holderMessage) {
+  public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
     referenceCount.incrementAndGet();
     return true;
   }
@@ -59,7 +58,7 @@ public abstract class PipeWritePlanEvent extends EnrichedEvent implements Serial
    * We just use a counter to prevent the reference count from being less than 0.
    */
   @Override
-  public boolean internallyDecreaseResourceReferenceCount(String holderMessage) {
+  public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
     final long count = referenceCount.decrementAndGet();
     if (count < 0) {
       LOGGER.warn("The reference count is less than 0, may need to check the implementation.");
@@ -68,7 +67,7 @@ public abstract class PipeWritePlanEvent extends EnrichedEvent implements Serial
   }
 
   @Override
-  public void bindProgressIndex(ProgressIndex progressIndex) {
+  public void bindProgressIndex(final ProgressIndex progressIndex) {
     this.progressIndex = progressIndex;
   }
 
@@ -86,4 +85,24 @@ public abstract class PipeWritePlanEvent extends EnrichedEvent implements Serial
   public boolean mayEventTimeOverlappedWithTimeRange() {
     return true;
   }
+
+  /////////////////////////// Object ///////////////////////////
+
+  @Override
+  public String toString() {
+    return String.format(
+            "PipeWritePlanEvent{progressIndex=%s, isGeneratedByPipe=%s}",
+            progressIndex, isGeneratedByPipe)
+        + " - "
+        + super.toString();
+  }
+
+  @Override
+  public String coreReportMessage() {
+    return String.format(
+            "PipeWritePlanEvent{progressIndex=%s, isGeneratedByPipe=%s}",
+            progressIndex, isGeneratedByPipe)
+        + " - "
+        + super.coreReportMessage();
+  }
 }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index 1b3ab0b1ef5..2a22ab9a7a6 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -57,7 +57,7 @@ public class PipeTaskMeta {
   private final Map<PipeRuntimeException, PipeRuntimeException> exceptionMessages =
       new ConcurrentHashMap<>();
 
-  public PipeTaskMeta(/* @NotNull */ ProgressIndex progressIndex, int leaderNodeId) {
+  public PipeTaskMeta(/* @NotNull */ final ProgressIndex progressIndex, final int leaderNodeId) {
     this.progressIndex.set(progressIndex);
     this.leaderNodeId.set(leaderNodeId);
   }
@@ -66,7 +66,7 @@ public class PipeTaskMeta {
     return progressIndex.get();
   }
 
-  public ProgressIndex updateProgressIndex(ProgressIndex updateIndex) {
+  public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
     return progressIndex.updateAndGet(
         index -> index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
   }
@@ -75,7 +75,7 @@ public class PipeTaskMeta {
     return leaderNodeId.get();
   }
 
-  public void setLeaderNodeId(int leaderNodeId) {
+  public void setLeaderNodeId(final int leaderNodeId) {
     this.leaderNodeId.set(leaderNodeId);
   }
 
@@ -87,11 +87,16 @@ public class PipeTaskMeta {
     return exceptionMessages.toString();
   }
 
-  public synchronized void trackExceptionMessage(PipeRuntimeException exceptionMessage) {
+  public synchronized void trackExceptionMessage(final PipeRuntimeException exceptionMessage) {
+    // Only keep the newest exception message to avoid excess rpc payload and
+    // show pipe response
+    // Here we still keep the map form to allow compatibility with legacy versions
+    exceptionMessages.clear();
     exceptionMessages.put(exceptionMessage, exceptionMessage);
   }
 
-  public synchronized boolean containsExceptionMessage(PipeRuntimeException exceptionMessage) {
+  public synchronized boolean containsExceptionMessage(
+      final PipeRuntimeException exceptionMessage) {
     return exceptionMessages.containsKey(exceptionMessage);
   }
 
@@ -103,7 +108,7 @@ public class PipeTaskMeta {
     exceptionMessages.clear();
   }
 
-  public synchronized void serialize(OutputStream outputStream) throws IOException {
+  public synchronized void serialize(final OutputStream outputStream) throws IOException {
     progressIndex.get().serialize(outputStream);
 
     ReadWriteIOUtils.write(leaderNodeId.get(), outputStream);
@@ -114,7 +119,8 @@ public class PipeTaskMeta {
     }
   }
 
-  public static PipeTaskMeta deserialize(PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
+  public static PipeTaskMeta deserialize(
+      final PipeRuntimeMetaVersion version, final ByteBuffer byteBuffer) {
     final ProgressIndex progressIndex = ProgressIndexType.deserializeFrom(byteBuffer);
 
     final int leaderNodeId = ReadWriteIOUtils.readInt(byteBuffer);
@@ -129,8 +135,8 @@ public class PipeTaskMeta {
     return pipeTaskMeta;
   }
 
-  public static PipeTaskMeta deserialize(PipeRuntimeMetaVersion version, InputStream inputStream)
-      throws IOException {
+  public static PipeTaskMeta deserialize(
+      final PipeRuntimeMetaVersion version, final InputStream inputStream) throws IOException {
     final ProgressIndex progressIndex = ProgressIndexType.deserializeFrom(inputStream);
 
     final int leaderNodeId = ReadWriteIOUtils.readInt(inputStream);
@@ -146,14 +152,14 @@ public class PipeTaskMeta {
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    PipeTaskMeta that = (PipeTaskMeta) obj;
+    final PipeTaskMeta that = (PipeTaskMeta) obj;
     return progressIndex.get().equals(that.progressIndex.get())
         && leaderNodeId.get() == that.leaderNodeId.get()
         && exceptionMessages.equals(that.exceptionMessages);