You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/08/28 13:34:40 UTC

[iotdb] branch pipe-reduce-he created (now 93283cec936)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch pipe-reduce-he
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 93283cec936 Pipe: drop HeartbeatEvent when pipe is stopped to avoid OOM

This branch includes the following new commits:

     new 93283cec936 Pipe: drop HeartbeatEvent when pipe is stopped to avoid OOM

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Pipe: drop HeartbeatEvent when pipe is stopped to avoid OOM

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-reduce-he
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 93283cec936db76d1590a176f00bfa7437a6addf
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Aug 28 21:34:04 2023 +0800

    Pipe: drop HeartbeatEvent when pipe is stopped to avoid OOM
---
 .../realtime/PipeRealtimeDataRegionHybridExtractor.java     |  9 ++++++++-
 .../realtime/PipeRealtimeDataRegionLogExtractor.java        |  9 ++++++++-
 .../realtime/PipeRealtimeDataRegionTsFileExtractor.java     |  7 +++++++
 .../iotdb/db/pipe/task/connection/BlockingPendingQueue.java |  2 +-
 .../iotdb/db/pipe/task/connection/PipeEventCollector.java   | 13 ++++++++++---
 .../pipe/task/connection/UnboundedBlockingPendingQueue.java | 12 ++++++++++--
 6 files changed, 44 insertions(+), 8 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index b9e8f275e28..edbc14e26d4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -155,11 +155,18 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
   }
 
   private void extractHeartbeat(PipeRealtimeEvent event) {
+    if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) {
+      // if the last event in the pending queue is a heartbeat event, we should not extract any more
+      // heartbeat events to avoid OOM when the pipe is stopped.
+      event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+      return;
+    }
+
     if (!pendingQueue.waitedOffer(event)) {
       // this would not happen, but just in case.
       // pendingQueue is unbounded, so it should never reach capacity.
       LOGGER.error(
-          "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} "
+          "extract: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
               + "has reached capacity, discard heartbeat event {}",
           this,
           event);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 14e899300b8..fb3783dc224 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -75,11 +75,18 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
   }
 
   private void extractHeartbeat(PipeRealtimeEvent event) {
+    if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) {
+      // if the last event in the pending queue is a heartbeat event, we should not extract any more
+      // heartbeat events to avoid OOM when the pipe is stopped.
+      event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+      return;
+    }
+
     if (!pendingQueue.waitedOffer(event)) {
       // this would not happen, but just in case.
       // pendingQueue is unbounded, so it should never reach capacity.
       LOGGER.error(
-          "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} "
+          "extract: pending queue of PipeRealtimeDataRegionLogExtractor {} "
               + "has reached capacity, discard heartbeat event {}",
           this,
           event);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index 9546e35906d..5967b1bc87b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -75,6 +75,13 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
   }
 
   private void extractHeartbeat(PipeRealtimeEvent event) {
+    if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) {
+      // if the last event in the pending queue is a heartbeat event, we should not extract any more
+      // heartbeat events to avoid OOM when the pipe is stopped.
+      event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+      return;
+    }
+
     if (!pendingQueue.waitedOffer(event)) {
       // This would not happen, but just in case.
       // Pending is unbounded, so it should never reach capacity.
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
index f81d301b6f2..fc7e2cd6f5c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
@@ -35,7 +35,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
   private static final long MAX_BLOCKING_TIME_MS =
       PipeConfig.getInstance().getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
 
-  private final BlockingQueue<E> pendingQueue;
+  protected final BlockingQueue<E> pendingQueue;
 
   protected BlockingPendingQueue(BlockingQueue<E> pendingQueue) {
     this.pendingQueue = pendingQueue;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index bf57908c71b..772851305ee 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -20,17 +20,18 @@
 package org.apache.iotdb.db.pipe.task.connection;
 
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
 
+import java.util.Deque;
 import java.util.LinkedList;
-import java.util.Queue;
 
 public class PipeEventCollector implements EventCollector {
 
   private final BoundedBlockingPendingQueue<Event> pendingQueue;
 
-  private final Queue<Event> bufferQueue;
+  private final Deque<Event> bufferQueue;
 
   public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
     this.pendingQueue = pendingQueue;
@@ -50,7 +51,13 @@ public class PipeEventCollector implements EventCollector {
       if (pendingQueue.waitedOffer(bufferedEvent)) {
         bufferQueue.poll();
       } else {
-        bufferQueue.offer(event);
+        // We can NOT keep too many PipeHeartbeatEvent in bufferQueue because they may cause OOM.
+        if (event instanceof PipeHeartbeatEvent
+            && bufferQueue.peekLast() instanceof PipeHeartbeatEvent) {
+          ((EnrichedEvent) event).decreaseReferenceCount(PipeEventCollector.class.getName());
+        } else {
+          bufferQueue.offer(event);
+        }
         return;
       }
     }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
index dafb567e902..343621bbb4a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
@@ -21,11 +21,19 @@ package org.apache.iotdb.db.pipe.task.connection;
 
 import org.apache.iotdb.pipe.api.event.Event;
 
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
 
 public class UnboundedBlockingPendingQueue<E extends Event> extends BlockingPendingQueue<E> {
 
+  private final BlockingDeque<E> pendingDeque;
+
   public UnboundedBlockingPendingQueue() {
-    super(new LinkedBlockingQueue<>());
+    super(new LinkedBlockingDeque<>());
+    pendingDeque = (BlockingDeque<E>) pendingQueue;
+  }
+
+  public E peekLast() {
+    return pendingDeque.peekLast();
   }
 }