You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/12 13:20:15 UTC
[iotdb] branch rel/1.2 updated: [To rel/1.2][IOTDB-6148] Pipe: Swapped the start order of realtimeExtractor and historicalExtractor to avoid losing data (#10927)(#11127)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new c52260ca792 [To rel/1.2][IOTDB-6148] Pipe: Swapped the start order of realtimeExtractor and historicalExtractor to avoid losing data (#10927)(#11127)
c52260ca792 is described below
commit c52260ca792a690c22a63524e2d4e8836852334d
Author: Caideyipi <87...@users.noreply.github.com>
AuthorDate: Tue Sep 12 21:20:06 2023 +0800
[To rel/1.2][IOTDB-6148] Pipe: Swapped the start order of realtimeExtractor and historicalExtractor to avoid losing data (#10927)(#11127)
(cherry picked from commit 55bea9206184681460a6548bec947391a36f502c)
---
...IoTDBThriftSyncPipeTransferBatchReqBuilder.java | 3 ++-
.../thrift/async/IoTDBThriftAsyncConnector.java | 2 +-
.../protocol/websocket/WebSocketConnector.java | 3 ++-
.../apache/iotdb/db/pipe/event/EnrichedEvent.java | 12 ++++++------
.../db/pipe/event/realtime/PipeRealtimeEvent.java | 4 ++--
.../PipeRealtimeDataRegionHybridExtractor.java | 22 ++++++++++++++--------
.../PipeRealtimeDataRegionLogExtractor.java | 11 ++++++-----
.../PipeRealtimeDataRegionTsFileExtractor.java | 11 ++++++-----
.../realtime/assigner/PipeDataRegionAssigner.java | 2 +-
.../pipe/task/connection/PipeEventCollector.java | 2 +-
.../iotdb/db/pipe/task/subtask/PipeSubtask.java | 6 +++---
.../subtask/connector/PipeConnectorSubtask.java | 2 +-
.../subtask/processor/PipeProcessorSubtask.java | 2 +-
.../plan/node/write/PipeEnrichedInsertNode.java | 1 +
14 files changed, 47 insertions(+), 36 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
index 0084af06f56..7347af503a7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
@@ -68,7 +68,8 @@ public class IoTDBThriftSyncPipeTransferBatchReqBuilder extends PipeTransferBatc
for (final Event event : events) {
if (event instanceof EnrichedEvent) {
- ((EnrichedEvent) event).decreaseReferenceCount(IoTDBThriftSyncConnector.class.getName());
+ ((EnrichedEvent) event)
+ .decreaseReferenceCount(IoTDBThriftSyncConnector.class.getName(), true);
}
}
events.clear();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index cb2c68ab680..376d85f354b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -513,7 +513,7 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector {
.ifPresent(
event ->
event.decreaseReferenceCount(
- IoTDBThriftAsyncConnector.class.getName()))));
+ IoTDBThriftAsyncConnector.class.getName(), true))));
while (!commitQueue.isEmpty()) {
final Pair<Long, Runnable> committer = commitQueue.peek();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 8c76fdebb2a..879cd9e2423 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -128,7 +128,8 @@ public class WebSocketConnector implements PipeConnector {
Optional.ofNullable(enrichedEvent)
.ifPresent(
event ->
- event.decreaseReferenceCount(WebSocketConnector.class.getName()))));
+ event.decreaseReferenceCount(
+ WebSocketConnector.class.getName(), true))));
while (!commitQueue.isEmpty()) {
final Pair<Long, Runnable> committer = commitQueue.peek();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index d19d8a30c7b..abe58d38857 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -84,12 +84,14 @@ public abstract class EnrichedEvent implements Event {
* @param holderMessage the message of the invoker
* @return true if the reference count is decreased successfully, false otherwise
*/
- public boolean decreaseReferenceCount(String holderMessage) {
+ public boolean decreaseReferenceCount(String holderMessage, boolean shouldReport) {
boolean isSuccessful = true;
synchronized (this) {
if (referenceCount.get() == 1) {
isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
- reportProgress();
+ if (shouldReport) {
+ reportProgress();
+ }
}
referenceCount.decrementAndGet();
}
@@ -97,9 +99,8 @@ public abstract class EnrichedEvent implements Event {
}
/**
- * Decrease the reference count of this event to 0. The event can be recycled and the data stored
- * in the event is not safe to use, the processing progress of the event should be reported to the
- * pipe task meta.
+ * Decrease the reference count of this event to 0, to release the event directly. The event can
+ * be recycled and the data stored in the event is not safe to use.
*
* @param holderMessage the message of the invoker
* @return true if the reference count is decreased successfully, false otherwise
@@ -109,7 +110,6 @@ public abstract class EnrichedEvent implements Event {
synchronized (this) {
if (referenceCount.get() >= 1) {
isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
- reportProgress();
}
referenceCount.set(0);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 4cfe03194e0..79994a3dd32 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -99,12 +99,12 @@ public class PipeRealtimeEvent extends EnrichedEvent {
}
@Override
- public boolean decreaseReferenceCount(String holderMessage) {
+ public boolean decreaseReferenceCount(String holderMessage, boolean shouldReport) {
// This method must be overridden, otherwise during the real-time data extraction stage, the
// current PipeRealtimeEvent rather than the member variable EnrichedEvent will decrease
// the reference count, resulting in errors in the reference count of the EnrichedEvent
// contained in this PipeRealtimeEvent during the processor and connector stages.
- return event.decreaseReferenceCount(holderMessage);
+ return event.decreaseReferenceCount(holderMessage, shouldReport);
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 1fb4386f797..c39f58a8874 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -77,7 +77,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
switch (state) {
case USING_TSFILE:
// Ignore the tablet event.
- event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+ event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
break;
case EMPTY:
case USING_TABLET:
@@ -94,7 +94,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
.report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
// Ignore the tablet event.
- event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+ event.decreaseReferenceCount(
+ PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
}
break;
default:
@@ -130,12 +131,16 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
.report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
// Ignore the tsfile event.
- event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+ event.decreaseReferenceCount(
+ PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
}
break;
case USING_TABLET:
- // All the tablet events have been extracted, so we can ignore the tsfile event.
- event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+ // All the tablet events have been extracted, so we can ignore the tsFile event.
+ // Report this event for SimpleProgressIndex, which does not have progressIndex for wal.
+ // This report won't affect IoTProgressIndex since the previous wal events have been
+ // successfully transferred here.
+ event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), true);
break;
default:
throw new UnsupportedOperationException(
@@ -157,7 +162,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
// If the last event in the pending queue is a heartbeat event, we should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped.
// Besides, the printable event has higher priority to stay in queue to enable metrics report.
- event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+ event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
return;
}
@@ -174,7 +179,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
// pipe progress.
// ignore this event.
- event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+ event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
}
}
@@ -205,7 +210,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
eventToSupply.getClass(), this));
}
- realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+ realtimeEvent.decreaseReferenceCount(
+ PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
if (suppliedEvent != null) {
return suppliedEvent;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 03ceb5ea7ba..bb796ec380a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -45,7 +45,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TABLET);
if (!(event.getEvent() instanceof TabletInsertionEvent)) {
- event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+ event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
return;
}
@@ -61,7 +61,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
// ignore this event.
- event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+ event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
}
}
@@ -77,7 +77,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
// If the last event in the pending queue is a heartbeat event, we should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped.
// Besides, the printable event has higher priority to stay in queue to enable metrics report.
- event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+ event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
return;
}
@@ -94,7 +94,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
// pipe progress.
// ignore this event.
- event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+ event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
}
}
@@ -132,7 +132,8 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
}
- realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+ realtimeEvent.decreaseReferenceCount(
+ PipeRealtimeDataRegionLogExtractor.class.getName(), false);
if (suppliedEvent != null) {
return suppliedEvent;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index d2d79d9951f..cfb219702c8 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -45,7 +45,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
- event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+ event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName(), false);
return;
}
@@ -61,7 +61,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
// Ignore the event.
- event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+ event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName(), false);
}
}
@@ -77,7 +77,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
// If the last event in the pending queue is a heartbeat event, we should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped.
// Besides, the printable event has higher priority to stay in queue to enable metrics report.
- event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+ event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName(), false);
return;
}
@@ -94,7 +94,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
// pipe progress.
// Ignore the event.
- event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+ event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName(), false);
}
}
@@ -132,7 +132,8 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
}
- realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+ realtimeEvent.decreaseReferenceCount(
+ PipeRealtimeDataRegionTsFileExtractor.class.getName(), false);
if (suppliedEvent != null) {
return suppliedEvent;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
index c50e15bc392..7ac257fcc74 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
@@ -72,7 +72,7 @@ public class PipeDataRegionAssigner {
}
});
event.gcSchemaInfo();
- event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName());
+ event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName(), false);
}
public void startAssignTo(PipeRealtimeDataRegionExtractor extractor) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index daa164a1ad0..a1e5fab5fd6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -58,7 +58,7 @@ public class PipeEventCollector implements EventCollector, AutoCloseable {
// We can NOT keep too many PipeHeartbeatEvent in bufferQueue because they may cause OOM.
if (event instanceof PipeHeartbeatEvent
&& bufferQueue.peekLast() instanceof PipeHeartbeatEvent) {
- ((EnrichedEvent) event).decreaseReferenceCount(PipeEventCollector.class.getName());
+ ((EnrichedEvent) event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
} else {
bufferQueue.offer(event);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index bdc494e7622..ed780c89fbe 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -193,13 +193,13 @@ public abstract class PipeSubtask
@Override
public synchronized void close() {
- releaseLastEvent();
+ releaseLastEvent(false);
}
- protected void releaseLastEvent() {
+ protected void releaseLastEvent(boolean shouldReport) {
if (lastEvent != null) {
if (lastEvent instanceof EnrichedEvent) {
- ((EnrichedEvent) lastEvent).decreaseReferenceCount(this.getClass().getName());
+ ((EnrichedEvent) lastEvent).decreaseReferenceCount(this.getClass().getName(), shouldReport);
}
lastEvent = null;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 4422ef2accc..d6732e590d1 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -116,7 +116,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
outputPipeConnector.transfer(event);
}
- releaseLastEvent();
+ releaseLastEvent(true);
} catch (PipeConnectionException e) {
throw e;
} catch (Exception e) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 0f1fc02a0c8..8f6d12d1b06 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -110,7 +110,7 @@ public class PipeProcessorSubtask extends PipeSubtask {
}
}
- releaseLastEvent();
+ releaseLastEvent(true);
} catch (Exception e) {
throw new PipeException(
"Error occurred during executing PipeProcessor#process, perhaps need to check "
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
index c2988beafe5..28d217176f4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;