You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2024/04/25 12:32:38 UTC
(iotdb) branch master updated: Pipe: Reduce exception messages to avoid excess rpc payload and show pipe response (#12415)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 98b2272dbc1 Pipe: Reduce exception messages to avoid excess rpc payload and show pipe response (#12415)
98b2272dbc1 is described below
commit 98b2272dbc180cfb64b56ea33931dcc2d09ec0a4
Author: Caideyipi <87...@users.noreply.github.com>
AuthorDate: Thu Apr 25 20:32:31 2024 +0800
Pipe: Reduce exception messages to avoid excess rpc payload and show pipe response (#12415)
---
.../protocol/IoTDBConfigRegionAirGapConnector.java | 5 ++-
.../pipe/event/PipeConfigRegionSnapshotEvent.java | 51 +++++++++++++---------
.../pipe/event/PipeConfigRegionWritePlanEvent.java | 46 +++++++++++++------
.../airgap/IoTDBDataRegionAirGapConnector.java | 9 ++--
.../airgap/IoTDBSchemaRegionAirGapConnector.java | 5 ++-
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 3 +-
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 10 +++--
.../schema/PipeSchemaRegionSnapshotEvent.java | 9 ++++
.../schema/PipeSchemaRegionWritePlanEvent.java | 44 +++++++++++++------
.../common/tsfile/PipeTsFileInsertionEvent.java | 9 ++++
.../subtask/processor/PipeProcessorSubtask.java | 38 ++++++++--------
.../commons/pipe/event/PipeSnapshotEvent.java | 19 +++++---
.../commons/pipe/event/PipeWritePlanEvent.java | 35 +++++++++++----
.../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 28 +++++++-----
14 files changed, 211 insertions(+), 100 deletions(-)
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index bd775cbe635..3235043fb45 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBAirGapConnector;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigNodeHandshakeV1Req;
import org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigNodeHandshakeV2Req;
import org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigPlanReq;
@@ -121,7 +122,9 @@ public class IoTDBConfigRegionAirGapConnector extends IoTDBAirGapConnector {
isSocketAlive.set(socketIndex, false);
throw new PipeConnectionException(
- String.format("Network error when transfer event %s, because %s.", event, e.getMessage()),
+ String.format(
+ "Network error when transfer event %s, because %s.",
+ ((EnrichedEvent) event).coreReportMessage(), e.getMessage()),
e);
}
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
index e5e759d5100..f4c5ad1bd97 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
@@ -88,17 +88,17 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
}
public PipeConfigRegionSnapshotEvent(
- String snapshotPath, String templateFilePath, CNSnapshotFileType type) {
+ final String snapshotPath, final String templateFilePath, final CNSnapshotFileType type) {
this(snapshotPath, templateFilePath, type, null, null, null);
}
public PipeConfigRegionSnapshotEvent(
- String snapshotPath,
- String templateFilePath,
- CNSnapshotFileType type,
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern) {
+ final String snapshotPath,
+ final String templateFilePath,
+ final CNSnapshotFileType type,
+ final String pipeName,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern) {
super(pipeName, pipeTaskMeta, pattern, PipeConfigNodeSnapshotResourceManager.getInstance());
this.snapshotPath = snapshotPath;
this.templateFilePath = Objects.nonNull(templateFilePath) ? templateFilePath : "";
@@ -118,14 +118,14 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
}
@Override
- public boolean internallyIncreaseResourceReferenceCount(String holderMessage) {
+ public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
try {
snapshotPath = resourceManager.increaseSnapshotReference(snapshotPath);
if (!templateFilePath.isEmpty()) {
templateFilePath = resourceManager.increaseSnapshotReference(templateFilePath);
}
return true;
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
String.format(
"Increase reference count for snapshot %s error. Holder Message: %s",
@@ -136,14 +136,14 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
}
@Override
- public boolean internallyDecreaseResourceReferenceCount(String holderMessage) {
+ public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
try {
resourceManager.decreaseSnapshotReference(snapshotPath);
if (!templateFilePath.isEmpty()) {
resourceManager.decreaseSnapshotReference(templateFilePath);
}
return true;
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
String.format(
"Decrease reference count for snapshot %s error. Holder Message: %s",
@@ -155,18 +155,18 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- long startTime,
- long endTime) {
+ final String pipeName,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime) {
return new PipeConfigRegionSnapshotEvent(
snapshotPath, templateFilePath, fileType, pipeName, pipeTaskMeta, pattern);
}
@Override
public ByteBuffer serializeToByteBuffer() {
- ByteBuffer result =
+ final ByteBuffer result =
ByteBuffer.allocate(
2 * Byte.BYTES
+ 2 * Integer.BYTES
@@ -180,7 +180,7 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
}
@Override
- public void deserializeFromByteBuffer(ByteBuffer buffer) {
+ public void deserializeFromByteBuffer(final ByteBuffer buffer) {
fileType = CNSnapshotFileType.deserialize(ReadWriteIOUtils.readByte(buffer));
snapshotPath = ReadWriteIOUtils.readString(buffer);
templateFilePath = ReadWriteIOUtils.readString(buffer);
@@ -188,7 +188,7 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
/////////////////////////////// Type parsing ///////////////////////////////
- public static boolean needTransferSnapshot(Set<ConfigPhysicalPlanType> listenedTypeSet) {
+ public static boolean needTransferSnapshot(final Set<ConfigPhysicalPlanType> listenedTypeSet) {
final Set<Short> types =
SNAPSHOT_FILE_TYPE_2_CONFIG_PHYSICAL_PLAN_TYPE_MAP.values().stream()
.flatMap(Collection::stream)
@@ -200,7 +200,7 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
return !types.isEmpty();
}
- public void confineTransferredTypes(Set<ConfigPhysicalPlanType> listenedTypeSet) {
+ public void confineTransferredTypes(final Set<ConfigPhysicalPlanType> listenedTypeSet) {
final Set<Short> types =
new HashSet<>(SNAPSHOT_FILE_TYPE_2_CONFIG_PHYSICAL_PLAN_TYPE_MAP.get(fileType));
types.retainAll(
@@ -210,7 +210,7 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
transferredTypes = types;
}
- public static Set<ConfigPhysicalPlanType> getConfigPhysicalPlanTypeSet(String sealTypes) {
+ public static Set<ConfigPhysicalPlanType> getConfigPhysicalPlanTypeSet(final String sealTypes) {
return sealTypes.isEmpty()
? Collections.emptySet()
: Arrays.stream(sealTypes.split(","))
@@ -231,4 +231,13 @@ public class PipeConfigRegionSnapshotEvent extends PipeSnapshotEvent {
+ " - "
+ super.toString();
}
+
+ @Override
+ public String coreReportMessage() {
+ return String.format(
+ "PipeConfigRegionSnapshotEvent{snapshotPath=%s, templateFilePath=%s, fileType=%s}",
+ snapshotPath, templateFilePath, fileType)
+ + " - "
+ + super.coreReportMessage();
+ }
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
index 52da0b27c6b..c6996907460 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
@@ -40,16 +40,16 @@ public class PipeConfigRegionWritePlanEvent extends PipeWritePlanEvent {
}
public PipeConfigRegionWritePlanEvent(
- ConfigPhysicalPlan configPhysicalPlan, boolean isGeneratedByPipe) {
+ final ConfigPhysicalPlan configPhysicalPlan, final boolean isGeneratedByPipe) {
this(configPhysicalPlan, null, null, null, isGeneratedByPipe);
}
public PipeConfigRegionWritePlanEvent(
- ConfigPhysicalPlan configPhysicalPlan,
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- boolean isGeneratedByPipe) {
+ final ConfigPhysicalPlan configPhysicalPlan,
+ final String pipeName,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final boolean isGeneratedByPipe) {
super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
this.configPhysicalPlan = configPhysicalPlan;
}
@@ -60,19 +60,19 @@ public class PipeConfigRegionWritePlanEvent extends PipeWritePlanEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- long startTime,
- long endTime) {
+ final String pipeName,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime) {
return new PipeConfigRegionWritePlanEvent(
configPhysicalPlan, pipeName, pipeTaskMeta, pattern, false);
}
@Override
public ByteBuffer serializeToByteBuffer() {
- ByteBuffer planBuffer = configPhysicalPlan.serializeToByteBuffer();
- ByteBuffer result = ByteBuffer.allocate(Byte.BYTES * 2 + planBuffer.limit());
+ final ByteBuffer planBuffer = configPhysicalPlan.serializeToByteBuffer();
+ final ByteBuffer result = ByteBuffer.allocate(Byte.BYTES * 2 + planBuffer.limit());
ReadWriteIOUtils.write(PipeConfigSerializableEventType.CONFIG_WRITE_PLAN.getType(), result);
ReadWriteIOUtils.write(isGeneratedByPipe, result);
result.put(planBuffer);
@@ -80,8 +80,26 @@ public class PipeConfigRegionWritePlanEvent extends PipeWritePlanEvent {
}
@Override
- public void deserializeFromByteBuffer(ByteBuffer buffer) throws IOException {
+ public void deserializeFromByteBuffer(final ByteBuffer buffer) throws IOException {
isGeneratedByPipe = ReadWriteIOUtils.readBool(buffer);
configPhysicalPlan = ConfigPhysicalPlan.Factory.create(buffer);
}
+
+ /////////////////////////// Object ///////////////////////////
+
+ @Override
+ public String toString() {
+ return String.format(
+ "PipeConfigRegionWritePlanEvent{configPhysicalPlan=%s}", configPhysicalPlan)
+ + " - "
+ + super.toString();
+ }
+
+ @Override
+ public String coreReportMessage() {
+ return String.format(
+ "PipeConfigRegionWritePlanEvent{configPhysicalPlan=%s}", configPhysicalPlan)
+ + " - "
+ + super.coreReportMessage();
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 7a187d624e1..0548e4da48f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.connector.protocol.airgap;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -82,7 +83,7 @@ public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector
throw new PipeConnectionException(
String.format(
"Network error when transfer tablet insertion event %s, because %s.",
- tabletInsertionEvent, e.getMessage()),
+ ((EnrichedEvent) tabletInsertionEvent).coreReportMessage(), e.getMessage()),
e);
}
}
@@ -115,7 +116,8 @@ public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector
throw new PipeConnectionException(
String.format(
"Network error when transfer tsfile insertion event %s, because %s.",
- tsFileInsertionEvent, e.getMessage()),
+ ((PipeTsFileInsertionEvent) tsFileInsertionEvent).coreReportMessage(),
+ e.getMessage()),
e);
}
}
@@ -138,7 +140,8 @@ public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector
throw new PipeConnectionException(
String.format(
- "Network error when transfer tsfile event %s, because %s.", event, e.getMessage()),
+ "Network error when transfer tsfile event %s, because %s.",
+ ((PipeSchemaRegionWritePlanEvent) event).coreReportMessage(), e.getMessage()),
e);
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
index c5755832ce8..74375cff112 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.connector.protocol.airgap;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -76,7 +77,9 @@ public class IoTDBSchemaRegionAirGapConnector extends IoTDBDataNodeAirGapConnect
isSocketAlive.set(socketIndex, false);
throw new PipeConnectionException(
- String.format("Network error when transfer event %s, because %s.", event, e.getMessage()),
+ String.format(
+ "Network error when transfer event %s, because %s.",
+ ((EnrichedEvent) event).coreReportMessage(), e.getMessage()),
e);
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 3f16d3563c5..dd12caf55a1 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -293,7 +293,8 @@ public class IoTDBLegacyPipeConnector implements PipeConnector {
} catch (final TException e) {
throw new PipeConnectionException(
String.format(
- "Network error when transfer tsFile insertion event: %s.", tsFileInsertionEvent),
+ "Network error when transfer tsFile insertion event: %s.",
+ ((PipeTsFileInsertionEvent) tsFileInsertionEvent).coreReportMessage()),
e);
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 7c19f9f44eb..9919d954b30 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
@@ -112,7 +113,7 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
throw new PipeConnectionException(
String.format(
"Failed to transfer tablet insertion event %s, because %s.",
- tabletInsertionEvent, e.getMessage()),
+ ((EnrichedEvent) tabletInsertionEvent).coreReportMessage(), e.getMessage()),
e);
}
}
@@ -138,7 +139,8 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
throw new PipeConnectionException(
String.format(
"Failed to transfer tsfile insertion event %s, because %s.",
- tsFileInsertionEvent, e.getMessage()),
+ ((PipeTsFileInsertionEvent) tsFileInsertionEvent).coreReportMessage(),
+ e.getMessage()),
e);
}
}
@@ -248,7 +250,7 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
status,
String.format(
"Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s",
- pipeInsertNodeTabletInsertionEvent, status),
+ pipeInsertNodeTabletInsertionEvent.coreReportMessage(), status),
pipeInsertNodeTabletInsertionEvent.toString());
}
if (insertNode != null && status.isSetRedirectNode()) {
@@ -303,7 +305,7 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
status,
String.format(
"Transfer PipeRawTabletInsertionEvent %s error, result status %s",
- pipeRawTabletInsertionEvent, status),
+ pipeRawTabletInsertionEvent.coreReportMessage(), status),
pipeRawTabletInsertionEvent.toString());
}
if (status.isSetRedirectNode()) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index e984c0c6f09..1b51c7f5568 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -212,4 +212,13 @@ public class PipeSchemaRegionSnapshotEvent extends PipeSnapshotEvent {
+ " - "
+ super.toString();
}
+
+ @Override
+ public String coreReportMessage() {
+ return String.format(
+ "PipeSchemaRegionSnapshotEvent{mTreeSnapshotPath=%s, tagLogSnapshotPath=%s, databaseName=%s}",
+ mTreeSnapshotPath, tagLogSnapshotPath, databaseName)
+ + " - "
+ + super.coreReportMessage();
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
index 6e944388e52..6e62fe80610 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
@@ -39,16 +39,16 @@ public class PipeSchemaRegionWritePlanEvent extends PipeWritePlanEvent {
this(null, false);
}
- public PipeSchemaRegionWritePlanEvent(PlanNode planNode, boolean isGeneratedByPipe) {
+ public PipeSchemaRegionWritePlanEvent(final PlanNode planNode, final boolean isGeneratedByPipe) {
this(planNode, null, null, null, isGeneratedByPipe);
}
public PipeSchemaRegionWritePlanEvent(
- PlanNode planNode,
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- boolean isGeneratedByPipe) {
+ final PlanNode planNode,
+ final String pipeName,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final boolean isGeneratedByPipe) {
super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
this.planNode = planNode;
}
@@ -59,19 +59,19 @@ public class PipeSchemaRegionWritePlanEvent extends PipeWritePlanEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- long startTime,
- long endTime) {
+ final String pipeName,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime) {
return new PipeSchemaRegionWritePlanEvent(
planNode, pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
}
@Override
public ByteBuffer serializeToByteBuffer() {
- ByteBuffer planBuffer = planNode.serializeToByteBuffer();
- ByteBuffer result = ByteBuffer.allocate(Byte.BYTES * 2 + planBuffer.limit());
+ final ByteBuffer planBuffer = planNode.serializeToByteBuffer();
+ final ByteBuffer result = ByteBuffer.allocate(Byte.BYTES * 2 + planBuffer.limit());
ReadWriteIOUtils.write(PipeSchemaSerializableEventType.SCHEMA_WRITE_PLAN.getType(), result);
ReadWriteIOUtils.write(isGeneratedByPipe, result);
result.put(planBuffer);
@@ -79,8 +79,24 @@ public class PipeSchemaRegionWritePlanEvent extends PipeWritePlanEvent {
}
@Override
- public void deserializeFromByteBuffer(ByteBuffer buffer) {
+ public void deserializeFromByteBuffer(final ByteBuffer buffer) {
isGeneratedByPipe = ReadWriteIOUtils.readBool(buffer);
planNode = PlanNodeType.deserialize(buffer);
}
+
+ /////////////////////////// Object ///////////////////////////
+
+ @Override
+ public String toString() {
+ return String.format("PipeConfigRegionWritePlanEvent{planNode=%s}", planNode)
+ + " - "
+ + super.toString();
+ }
+
+ @Override
+ public String coreReportMessage() {
+ return String.format("PipeConfigRegionWritePlanEvent{planNode=%s}", planNode)
+ + " - "
+ + super.coreReportMessage();
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index c609eb907f5..3d951d0e394 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -367,4 +367,13 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns
+ " - "
+ super.toString();
}
+
+ @Override
+ public String coreReportMessage() {
+ return String.format(
+ "PipeTsFileInsertionEvent{resource=%s, tsFile=%s, isLoaded=%s, isGeneratedByPipe=%s, isClosed=%s}",
+ resource, tsFile, isLoaded, isGeneratedByPipe, isClosed.get())
+ + " - "
+ + super.coreReportMessage();
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 45568782948..3c47d3b873d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -65,13 +65,13 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {
private final long subtaskCreationTime;
public PipeProcessorSubtask(
- String taskID,
- long creationTime,
- String pipeName,
- int dataRegionId,
- EventSupplier inputEventSupplier,
- PipeProcessor pipeProcessor,
- PipeEventCollector outputEventCollector) {
+ final String taskID,
+ final long creationTime,
+ final String pipeName,
+ final int dataRegionId,
+ final EventSupplier inputEventSupplier,
+ final PipeProcessor pipeProcessor,
+ final PipeEventCollector outputEventCollector) {
super(taskID, creationTime);
this.subtaskCreationTime = System.currentTimeMillis();
this.pipeName = pipeName;
@@ -84,9 +84,9 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {
@Override
public void bindExecutors(
- ListeningExecutorService subtaskWorkerThreadPoolExecutor,
- ExecutorService ignored,
- PipeSubtaskScheduler subtaskScheduler) {
+ final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
+ final ExecutorService ignored,
+ final PipeSubtaskScheduler subtaskScheduler) {
this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
this.subtaskScheduler = subtaskScheduler;
@@ -150,17 +150,21 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {
}
decreaseReferenceCountAndReleaseLastEvent(
!isClosed.get() && outputEventCollector.hasNoCollectInvocationAfterReset());
- } catch (PipeRuntimeOutOfMemoryCriticalException e) {
+ } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
LOGGER.info(
"Temporarily out of memory in pipe event processing, will wait for the memory to release.",
e);
return false;
- } catch (Exception e) {
+ } catch (final Exception e) {
if (!isClosed.get()) {
throw new PipeException(
String.format(
"Exception in pipe process, subtask: %s, last event: %s, root cause: %s",
- taskID, lastEvent, ErrorHandlingUtils.getRootCause(e).getMessage()),
+ taskID,
+ lastEvent instanceof EnrichedEvent
+ ? ((EnrichedEvent) lastEvent).coreReportMessage()
+ : lastEvent,
+ ErrorHandlingUtils.getRootCause(e).getMessage()),
e);
} else {
LOGGER.info("Exception in pipe event processing, ignored because pipe is dropped.", e);
@@ -191,7 +195,7 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {
// pipeProcessor closes first, then no more events will be added into outputEventCollector.
// only after that, outputEventCollector can be closed.
pipeProcessor.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.info(
"Exception occurred when closing pipe processor subtask {}, root cause: {}",
taskID,
@@ -210,7 +214,7 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
@@ -252,12 +256,12 @@ public class PipeProcessorSubtask extends PipeReportableSubtask {
//////////////////////////// Error report ////////////////////////////
@Override
- protected String getRootCause(Throwable throwable) {
+ protected String getRootCause(final Throwable throwable) {
return ErrorHandlingUtils.getRootCause(throwable).getMessage();
}
@Override
- protected void report(EnrichedEvent event, PipeRuntimeException exception) {
+ protected void report(final EnrichedEvent event, final PipeRuntimeException exception) {
PipeAgent.runtime().report(event, exception);
}
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
index b7193ef4dbf..fa26319e6dc 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
@@ -34,16 +34,16 @@ public abstract class PipeSnapshotEvent extends EnrichedEvent implements Seriali
protected Set<Short> transferredTypes;
protected PipeSnapshotEvent(
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- PipeSnapshotResourceManager resourceManager) {
+ final String pipeName,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final PipeSnapshotResourceManager resourceManager) {
super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
this.resourceManager = resourceManager;
}
@Override
- public void bindProgressIndex(ProgressIndex progressIndex) {
+ public void bindProgressIndex(final ProgressIndex progressIndex) {
this.progressIndex = progressIndex;
}
@@ -80,4 +80,13 @@ public abstract class PipeSnapshotEvent extends EnrichedEvent implements Seriali
+ " - "
+ super.toString();
}
+
+ @Override
+ public String coreReportMessage() {
+ return String.format(
+ "PipeSnapshotEvent{progressIndex=%s, transferredTypes=%s}",
+ progressIndex, transferredTypes)
+ + " - "
+ + super.coreReportMessage();
+ }
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
index e31e9c85efc..2e9619d65fa 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
@@ -26,20 +26,19 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.atomic.AtomicLong;
-
public abstract class PipeWritePlanEvent extends EnrichedEvent implements SerializableEvent {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeWritePlanEvent.class);
protected boolean isGeneratedByPipe;
- protected final AtomicLong referenceCount = new AtomicLong(0);
-
protected ProgressIndex progressIndex;
protected PipeWritePlanEvent(
- String pipeName, PipeTaskMeta pipeTaskMeta, PipePattern pattern, boolean isGeneratedByPipe) {
+ final String pipeName,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final boolean isGeneratedByPipe) {
super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
this.isGeneratedByPipe = isGeneratedByPipe;
}
@@ -49,7 +48,7 @@ public abstract class PipeWritePlanEvent extends EnrichedEvent implements Serial
* We just use a counter to prevent the reference count from being less than 0.
*/
@Override
- public boolean internallyIncreaseResourceReferenceCount(String holderMessage) {
+ public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
referenceCount.incrementAndGet();
return true;
}
@@ -59,7 +58,7 @@ public abstract class PipeWritePlanEvent extends EnrichedEvent implements Serial
* We just use a counter to prevent the reference count from being less than 0.
*/
@Override
- public boolean internallyDecreaseResourceReferenceCount(String holderMessage) {
+ public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
final long count = referenceCount.decrementAndGet();
if (count < 0) {
LOGGER.warn("The reference count is less than 0, may need to check the implementation.");
@@ -68,7 +67,7 @@ public abstract class PipeWritePlanEvent extends EnrichedEvent implements Serial
}
@Override
- public void bindProgressIndex(ProgressIndex progressIndex) {
+ public void bindProgressIndex(final ProgressIndex progressIndex) {
this.progressIndex = progressIndex;
}
@@ -86,4 +85,24 @@ public abstract class PipeWritePlanEvent extends EnrichedEvent implements Serial
public boolean mayEventTimeOverlappedWithTimeRange() {
return true;
}
+
+ /////////////////////////// Object ///////////////////////////
+
+ @Override
+ public String toString() {
+ return String.format(
+ "PipeWritePlanEvent{progressIndex=%s, isGeneratedByPipe=%s}",
+ progressIndex, isGeneratedByPipe)
+ + " - "
+ + super.toString();
+ }
+
+ @Override
+ public String coreReportMessage() {
+ return String.format(
+ "PipeWritePlanEvent{progressIndex=%s, isGeneratedByPipe=%s}",
+ progressIndex, isGeneratedByPipe)
+ + " - "
+ + super.coreReportMessage();
+ }
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index 1b3ab0b1ef5..2a22ab9a7a6 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -57,7 +57,7 @@ public class PipeTaskMeta {
private final Map<PipeRuntimeException, PipeRuntimeException> exceptionMessages =
new ConcurrentHashMap<>();
- public PipeTaskMeta(/* @NotNull */ ProgressIndex progressIndex, int leaderNodeId) {
+ public PipeTaskMeta(/* @NotNull */ final ProgressIndex progressIndex, final int leaderNodeId) {
this.progressIndex.set(progressIndex);
this.leaderNodeId.set(leaderNodeId);
}
@@ -66,7 +66,7 @@ public class PipeTaskMeta {
return progressIndex.get();
}
- public ProgressIndex updateProgressIndex(ProgressIndex updateIndex) {
+ public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
return progressIndex.updateAndGet(
index -> index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
}
@@ -75,7 +75,7 @@ public class PipeTaskMeta {
return leaderNodeId.get();
}
- public void setLeaderNodeId(int leaderNodeId) {
+ public void setLeaderNodeId(final int leaderNodeId) {
this.leaderNodeId.set(leaderNodeId);
}
@@ -87,11 +87,16 @@ public class PipeTaskMeta {
return exceptionMessages.toString();
}
- public synchronized void trackExceptionMessage(PipeRuntimeException exceptionMessage) {
+ public synchronized void trackExceptionMessage(final PipeRuntimeException exceptionMessage) {
+ // Only keep the newest exception message to avoid excess rpc payload and
+ // show pipe response
+ // Here we still keep the map form to allow compatibility with legacy versions
+ exceptionMessages.clear();
exceptionMessages.put(exceptionMessage, exceptionMessage);
}
- public synchronized boolean containsExceptionMessage(PipeRuntimeException exceptionMessage) {
+ public synchronized boolean containsExceptionMessage(
+ final PipeRuntimeException exceptionMessage) {
return exceptionMessages.containsKey(exceptionMessage);
}
@@ -103,7 +108,7 @@ public class PipeTaskMeta {
exceptionMessages.clear();
}
- public synchronized void serialize(OutputStream outputStream) throws IOException {
+ public synchronized void serialize(final OutputStream outputStream) throws IOException {
progressIndex.get().serialize(outputStream);
ReadWriteIOUtils.write(leaderNodeId.get(), outputStream);
@@ -114,7 +119,8 @@ public class PipeTaskMeta {
}
}
- public static PipeTaskMeta deserialize(PipeRuntimeMetaVersion version, ByteBuffer byteBuffer) {
+ public static PipeTaskMeta deserialize(
+ final PipeRuntimeMetaVersion version, final ByteBuffer byteBuffer) {
final ProgressIndex progressIndex = ProgressIndexType.deserializeFrom(byteBuffer);
final int leaderNodeId = ReadWriteIOUtils.readInt(byteBuffer);
@@ -129,8 +135,8 @@ public class PipeTaskMeta {
return pipeTaskMeta;
}
- public static PipeTaskMeta deserialize(PipeRuntimeMetaVersion version, InputStream inputStream)
- throws IOException {
+ public static PipeTaskMeta deserialize(
+ final PipeRuntimeMetaVersion version, final InputStream inputStream) throws IOException {
final ProgressIndex progressIndex = ProgressIndexType.deserializeFrom(inputStream);
final int leaderNodeId = ReadWriteIOUtils.readInt(inputStream);
@@ -146,14 +152,14 @@ public class PipeTaskMeta {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- PipeTaskMeta that = (PipeTaskMeta) obj;
+ final PipeTaskMeta that = (PipeTaskMeta) obj;
return progressIndex.get().equals(that.progressIndex.get())
&& leaderNodeId.get() == that.leaderNodeId.get()
&& exceptionMessages.equals(that.exceptionMessages);