You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/12 13:20:15 UTC

[iotdb] branch rel/1.2 updated: [To rel/1.2][IOTDB-6148] Pipe: Swapped the start order of realtimeExtractor and historicalExtractor to avoid losing data (#10927)(#11127)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new c52260ca792 [To rel/1.2][IOTDB-6148] Pipe: Swapped the start order of realtimeExtractor and historicalExtractor to avoid losing data (#10927)(#11127)
c52260ca792 is described below

commit c52260ca792a690c22a63524e2d4e8836852334d
Author: Caideyipi <87...@users.noreply.github.com>
AuthorDate: Tue Sep 12 21:20:06 2023 +0800

    [To rel/1.2][IOTDB-6148] Pipe: Swapped the start order of realtimeExtractor and historicalExtractor to avoid losing data (#10927)(#11127)
    
    (cherry picked from commit 55bea9206184681460a6548bec947391a36f502c)
---
 ...IoTDBThriftSyncPipeTransferBatchReqBuilder.java |  3 ++-
 .../thrift/async/IoTDBThriftAsyncConnector.java    |  2 +-
 .../protocol/websocket/WebSocketConnector.java     |  3 ++-
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  | 12 ++++++------
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  |  4 ++--
 .../PipeRealtimeDataRegionHybridExtractor.java     | 22 ++++++++++++++--------
 .../PipeRealtimeDataRegionLogExtractor.java        | 11 ++++++-----
 .../PipeRealtimeDataRegionTsFileExtractor.java     | 11 ++++++-----
 .../realtime/assigner/PipeDataRegionAssigner.java  |  2 +-
 .../pipe/task/connection/PipeEventCollector.java   |  2 +-
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    |  6 +++---
 .../subtask/connector/PipeConnectorSubtask.java    |  2 +-
 .../subtask/processor/PipeProcessorSubtask.java    |  2 +-
 .../plan/node/write/PipeEnrichedInsertNode.java    |  1 +
 14 files changed, 47 insertions(+), 36 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
index 0084af06f56..7347af503a7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
@@ -68,7 +68,8 @@ public class IoTDBThriftSyncPipeTransferBatchReqBuilder extends PipeTransferBatc
 
     for (final Event event : events) {
       if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) event).decreaseReferenceCount(IoTDBThriftSyncConnector.class.getName());
+        ((EnrichedEvent) event)
+            .decreaseReferenceCount(IoTDBThriftSyncConnector.class.getName(), true);
       }
     }
     events.clear();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index cb2c68ab680..376d85f354b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -513,7 +513,7 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector {
                     .ifPresent(
                         event ->
                             event.decreaseReferenceCount(
-                                IoTDBThriftAsyncConnector.class.getName()))));
+                                IoTDBThriftAsyncConnector.class.getName(), true))));
 
     while (!commitQueue.isEmpty()) {
       final Pair<Long, Runnable> committer = commitQueue.peek();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 8c76fdebb2a..879cd9e2423 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -128,7 +128,8 @@ public class WebSocketConnector implements PipeConnector {
                 Optional.ofNullable(enrichedEvent)
                     .ifPresent(
                         event ->
-                            event.decreaseReferenceCount(WebSocketConnector.class.getName()))));
+                            event.decreaseReferenceCount(
+                                WebSocketConnector.class.getName(), true))));
 
     while (!commitQueue.isEmpty()) {
       final Pair<Long, Runnable> committer = commitQueue.peek();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index d19d8a30c7b..abe58d38857 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -84,12 +84,14 @@ public abstract class EnrichedEvent implements Event {
    * @param holderMessage the message of the invoker
    * @return true if the reference count is decreased successfully, false otherwise
    */
-  public boolean decreaseReferenceCount(String holderMessage) {
+  public boolean decreaseReferenceCount(String holderMessage, boolean shouldReport) {
     boolean isSuccessful = true;
     synchronized (this) {
       if (referenceCount.get() == 1) {
         isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
-        reportProgress();
+        if (shouldReport) {
+          reportProgress();
+        }
       }
       referenceCount.decrementAndGet();
     }
@@ -97,9 +99,8 @@ public abstract class EnrichedEvent implements Event {
   }
 
   /**
-   * Decrease the reference count of this event to 0. The event can be recycled and the data stored
-   * in the event is not safe to use, the processing progress of the event should be reported to the
-   * pipe task meta.
+   * Decrease the reference count of this event to 0, to release the event directly. The event can
+   * be recycled and the data stored in the event is not safe to use.
    *
    * @param holderMessage the message of the invoker
    * @return true if the reference count is decreased successfully, false otherwise
@@ -109,7 +110,6 @@ public abstract class EnrichedEvent implements Event {
     synchronized (this) {
       if (referenceCount.get() >= 1) {
         isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
-        reportProgress();
       }
       referenceCount.set(0);
     }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 4cfe03194e0..79994a3dd32 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -99,12 +99,12 @@ public class PipeRealtimeEvent extends EnrichedEvent {
   }
 
   @Override
-  public boolean decreaseReferenceCount(String holderMessage) {
+  public boolean decreaseReferenceCount(String holderMessage, boolean shouldReport) {
     // This method must be overridden, otherwise during the real-time data extraction stage, the
     // current PipeRealtimeEvent rather than the member variable EnrichedEvent will decrease
     // the reference count, resulting in errors in the reference count of the EnrichedEvent
     // contained in this PipeRealtimeEvent during the processor and connector stages.
-    return event.decreaseReferenceCount(holderMessage);
+    return event.decreaseReferenceCount(holderMessage, shouldReport);
   }
 
   @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 1fb4386f797..c39f58a8874 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -77,7 +77,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
     switch (state) {
       case USING_TSFILE:
         // Ignore the tablet event.
-        event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+        event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
         break;
       case EMPTY:
       case USING_TABLET:
@@ -94,7 +94,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
               .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
 
           // Ignore the tablet event.
-          event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+          event.decreaseReferenceCount(
+              PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
         }
         break;
       default:
@@ -130,12 +131,16 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
               .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
 
           // Ignore the tsfile event.
-          event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+          event.decreaseReferenceCount(
+              PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
         }
         break;
       case USING_TABLET:
-        // All the tablet events have been extracted, so we can ignore the tsfile event.
-        event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+        // All the tablet events have been extracted, so we can ignore the tsFile event.
+        // Report this event for SimpleProgressIndex, which does not have progressIndex for wal.
+        // This report won't affect IoTProgressIndex since the previous wal events have been
+        // successfully transferred here.
+        event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), true);
         break;
       default:
         throw new UnsupportedOperationException(
@@ -157,7 +162,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
       // If the last event in the pending queue is a heartbeat event, we should not extract any more
       // heartbeat events to avoid OOM when the pipe is stopped.
       // Besides, the printable event has higher priority to stay in queue to enable metrics report.
-      event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+      event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
       return;
     }
 
@@ -174,7 +179,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
       // pipe progress.
 
       // ignore this event.
-      event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+      event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
     }
   }
 
@@ -205,7 +210,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
                 eventToSupply.getClass(), this));
       }
 
-      realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+      realtimeEvent.decreaseReferenceCount(
+          PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
 
       if (suppliedEvent != null) {
         return suppliedEvent;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 03ceb5ea7ba..bb796ec380a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -45,7 +45,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
     event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TABLET);
 
     if (!(event.getEvent() instanceof TabletInsertionEvent)) {
-      event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+      event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
       return;
     }
 
@@ -61,7 +61,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
       PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
 
       // ignore this event.
-      event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+      event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
     }
   }
 
@@ -77,7 +77,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
       // If the last event in the pending queue is a heartbeat event, we should not extract any more
       // heartbeat events to avoid OOM when the pipe is stopped.
       // Besides, the printable event has higher priority to stay in queue to enable metrics report.
-      event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+      event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
       return;
     }
 
@@ -94,7 +94,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
       // pipe progress.
 
       // ignore this event.
-      event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+      event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
     }
   }
 
@@ -132,7 +132,8 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
         PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
       }
 
-      realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+      realtimeEvent.decreaseReferenceCount(
+          PipeRealtimeDataRegionLogExtractor.class.getName(), false);
 
       if (suppliedEvent != null) {
         return suppliedEvent;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index d2d79d9951f..cfb219702c8 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -45,7 +45,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
     event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
 
     if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
-      event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+      event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName(), false);
       return;
     }
 
@@ -61,7 +61,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
       PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
 
       // Ignore the event.
-      event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+      event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName(), false);
     }
   }
 
@@ -77,7 +77,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
       // If the last event in the pending queue is a heartbeat event, we should not extract any more
       // heartbeat events to avoid OOM when the pipe is stopped.
       // Besides, the printable event has higher priority to stay in queue to enable metrics report.
-      event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+      event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName(), false);
       return;
     }
 
@@ -94,7 +94,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
       // pipe progress.
 
       // Ignore the event.
-      event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+      event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName(), false);
     }
   }
 
@@ -132,7 +132,8 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
         PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
       }
 
-      realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+      realtimeEvent.decreaseReferenceCount(
+          PipeRealtimeDataRegionTsFileExtractor.class.getName(), false);
 
       if (suppliedEvent != null) {
         return suppliedEvent;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
index c50e15bc392..7ac257fcc74 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
@@ -72,7 +72,7 @@ public class PipeDataRegionAssigner {
               }
             });
     event.gcSchemaInfo();
-    event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName());
+    event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName(), false);
   }
 
   public void startAssignTo(PipeRealtimeDataRegionExtractor extractor) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index daa164a1ad0..a1e5fab5fd6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -58,7 +58,7 @@ public class PipeEventCollector implements EventCollector, AutoCloseable {
         // We can NOT keep too many PipeHeartbeatEvent in bufferQueue because they may cause OOM.
         if (event instanceof PipeHeartbeatEvent
             && bufferQueue.peekLast() instanceof PipeHeartbeatEvent) {
-          ((EnrichedEvent) event).decreaseReferenceCount(PipeEventCollector.class.getName());
+          ((EnrichedEvent) event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
         } else {
           bufferQueue.offer(event);
         }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index bdc494e7622..ed780c89fbe 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -193,13 +193,13 @@ public abstract class PipeSubtask
 
   @Override
   public synchronized void close() {
-    releaseLastEvent();
+    releaseLastEvent(false);
   }
 
-  protected void releaseLastEvent() {
+  protected void releaseLastEvent(boolean shouldReport) {
     if (lastEvent != null) {
       if (lastEvent instanceof EnrichedEvent) {
-        ((EnrichedEvent) lastEvent).decreaseReferenceCount(this.getClass().getName());
+        ((EnrichedEvent) lastEvent).decreaseReferenceCount(this.getClass().getName(), shouldReport);
       }
       lastEvent = null;
     }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 4422ef2accc..d6732e590d1 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -116,7 +116,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
         outputPipeConnector.transfer(event);
       }
 
-      releaseLastEvent();
+      releaseLastEvent(true);
     } catch (PipeConnectionException e) {
       throw e;
     } catch (Exception e) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 0f1fc02a0c8..8f6d12d1b06 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -110,7 +110,7 @@ public class PipeProcessorSubtask extends PipeSubtask {
         }
       }
 
-      releaseLastEvent();
+      releaseLastEvent(true);
     } catch (Exception e) {
       throw new PipeException(
           "Error occurred during executing PipeProcessor#process, perhaps need to check "
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
index c2988beafe5..28d217176f4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/PipeEnrichedInsertNode.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;