You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/08/03 17:49:13 UTC

[iotdb] branch rel/1.2 updated: [IOTDB-6097] Pipe: Avoid subscrption running with the pattern option causing OOM & Fix de/ser of RecoverProgressIndex (#10767) (#10775)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new 12d09e0e17e [IOTDB-6097] Pipe: Avoid subscrption running with the pattern option causing OOM & Fix de/ser of RecoverProgressIndex (#10767) (#10775)
12d09e0e17e is described below

commit 12d09e0e17e9aa8208293c6334acaeed403831c6
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri Aug 4 01:49:06 2023 +0800

    [IOTDB-6097] Pipe: Avoid subscrption running with the pattern option causing OOM & Fix de/ser of RecoverProgressIndex (#10767) (#10775)
    
    This commit fixes 2 issues:
    
    * Subscrption running with the pattern option may cause OOM
    
      How to reproduce:
    
      1. execute sql:
      ```
      create pipe test1
      with extractor (
        'extractor.history.enable'='false',
        'extractor'='iotdb-extractor',
        'extractor.realtime.mode'='log',
        'extractor.pattern'='root'
      )
      with connector (
        'connector'='iotdb-thrift-connector-v1',
        'connector.node-urls'='127.0.0.1:6668'
      );
    
      start pipe test1;
      ```
    
      2. run benchmark: 1 database, 10 devices, 10 measurements.
    
    * java.lang.UnsupportedOperationException: Unsupported PipeRuntimeException type 0 caused by de/ser issue of RecoverProgressIndex
      <img width="1194" alt="image" src="https://github.com/apache/iotdb/assets/30497621/d2d35ee7-293b-4594-92f3-fc10b2aa8313">
    
    (cherry picked from commit f0f168249b27e0eb50665cb65b47201cf7db3671)
---
 .../PipeRealtimeDataRegionHybridExtractor.java     |  4 +-
 .../PipeRealtimeDataRegionLogExtractor.java        |  2 +-
 .../PipeRealtimeDataRegionTsFileExtractor.java     |  2 +-
 .../pipe/task/connection/BlockingPendingQueue.java | 22 ++++++++--
 .../pipe/task/connection/PipeEventCollector.java   | 48 +++++++++++++++++-----
 .../resources/conf/iotdb-common.properties         |  8 ++--
 .../apache/iotdb/commons/conf/CommonConfig.java    |  6 +--
 .../consensus/index/impl/RecoverProgressIndex.java |  5 ++-
 .../iotdb/commons/pipe/PipeMetaDeSerTest.java      | 22 +++++++++-
 9 files changed, 89 insertions(+), 30 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index bbddc5d03b6..ffefb13daf0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -81,7 +81,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
     }
 
     if (!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)
-        && !pendingQueue.offer(event)) {
+        && !pendingQueue.waitedOffer(event)) {
       LOGGER.warn(
           "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
               + "has reached capacity, discard tablet event {}, current state {}",
@@ -101,7 +101,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
             state ->
                 state.equals(TsFileEpoch.State.EMPTY) ? TsFileEpoch.State.USING_TSFILE : state);
 
-    if (!pendingQueue.offer(event)) {
+    if (!pendingQueue.waitedOffer(event)) {
       LOGGER.warn(
           "extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
               + "has reached capacity, discard TsFile event {}, current state {}",
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 378af16857b..74ff0c533b1 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -51,7 +51,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
       return;
     }
 
-    if (!pendingQueue.offer(event)) {
+    if (!pendingQueue.waitedOffer(event)) {
       LOGGER.warn(
           "extract: pending queue of PipeRealtimeDataRegionLogExtractor {} "
               + "has reached capacity, discard tablet event {}, current state {}",
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index ec351310c54..bab1ea2ec46 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -51,7 +51,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
       return;
     }
 
-    if (!pendingQueue.offer(event)) {
+    if (!pendingQueue.waitedOffer(event)) {
       LOGGER.warn(
           "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} "
               + "has reached capacity, discard TsFile event {}, current state {}",
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
index 2787adc4d0a..f81d301b6f2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
@@ -41,15 +41,29 @@ public abstract class BlockingPendingQueue<E extends Event> {
     this.pendingQueue = pendingQueue;
   }
 
-  public boolean offer(E event) {
-    boolean isAdded = false;
+  public boolean waitedOffer(E event) {
     try {
-      isAdded = pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
+      return pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
     } catch (InterruptedException e) {
       LOGGER.info("pending queue offer is interrupted.", e);
       Thread.currentThread().interrupt();
+      return false;
+    }
+  }
+
+  public boolean directOffer(E event) {
+    return pendingQueue.offer(event);
+  }
+
+  public boolean put(E event) {
+    try {
+      pendingQueue.put(event);
+      return true;
+    } catch (InterruptedException e) {
+      LOGGER.info("pending queue put is interrupted.", e);
+      Thread.currentThread().interrupt();
+      return false;
     }
-    return isAdded;
   }
 
   public E directPoll() {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 498fc5aecfa..19ef3f6fd2f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -20,22 +20,22 @@
 package org.apache.iotdb.db.pipe.task.connection;
 
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.LinkedList;
 import java.util.Queue;
 
 public class PipeEventCollector implements EventCollector {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeEventCollector.class);
+
   private final BoundedBlockingPendingQueue<Event> pendingQueue;
 
-  // buffer queue is used to store events that are not offered to pending queue
-  // because the pending queue is full. when pending queue is full, pending queue
-  // will notify tasks to stop extracting events, and buffer queue will be used to store
-  // events before tasks are stopped. when pending queue is not full and tasks are
-  // notified by the pending queue to start extracting events, buffer queue will be used to store
-  // events before events in buffer queue are offered to pending queue.
   private final Queue<Event> bufferQueue;
 
   public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
@@ -51,16 +51,42 @@ public class PipeEventCollector implements EventCollector {
 
     while (!bufferQueue.isEmpty()) {
       final Event bufferedEvent = bufferQueue.peek();
-      if (pendingQueue.offer(bufferedEvent)) {
+      // Try to put already buffered events into pending queue, if pending queue is full, wait for
+      // pending queue to be available with timeout.
+      if (pendingQueue.waitedOffer(bufferedEvent)) {
         bufferQueue.poll();
       } else {
-        bufferQueue.offer(event);
-        return;
+        // If timeout, we judge whether the new event is a PipeRawTabletInsertionEvent. If it is,
+        // we wait for pending queue to be available without timeout until the pending queue is
+        // available. We don't put PipeRawTabletInsertionEvent into buffer queue, because it is
+        // memory consuming, holding too many PipeRawTabletInsertionEvent in buffer queue may cause
+        // OOM.
+        if (event instanceof PipeRawTabletInsertionEvent) {
+          if (pendingQueue.put(bufferedEvent)) {
+            bufferQueue.poll();
+          } else {
+            LOGGER.warn("interrupted when putting event into pending queue, event: {}", event);
+            bufferQueue.offer(event);
+            return;
+          }
+        } else {
+          bufferQueue.offer(event);
+          return;
+        }
       }
     }
 
-    if (!pendingQueue.offer(event)) {
-      bufferQueue.offer(event);
+    if (!pendingQueue.waitedOffer(event)) {
+      // PipeRawTabletInsertionEvent is memory consuming, so we should not put it into buffer queue
+      // when pending queue is full. Otherwise, it may cause OOM.
+      if (event instanceof PipeRawTabletInsertionEvent) {
+        if (!pendingQueue.put(event)) {
+          LOGGER.warn("interrupted when putting event into pending queue, event: {}", event);
+          bufferQueue.offer(event);
+        }
+      } else {
+        bufferQueue.offer(event);
+      }
     }
   }
 }
diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 08cc77c3c35..3524973a789 100644
--- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -962,7 +962,7 @@ cluster_name=defaultCluster
 # pipe_hardlink_wal_enabled=false
 
 # The row size of tablets created in pipe transfer.
-# pipe_data_structure_tablet_row_size=65536
+# pipe_data_structure_tablet_row_size=16384
 
 # The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
 # pipe_subtask_executor_max_thread_num=5
@@ -983,11 +983,11 @@ cluster_name=defaultCluster
 # pipe_extractor_matcher_cache_size=1024
 
 # The capacity for the number of tablet events that can be stored in the pending queue of the hybrid realtime extractor.
-# pipe_extractor_pending_queue_capacity=128
+# pipe_extractor_pending_queue_capacity=16
 
 # The limit for the number of tablet events that can be held in the pending queue of the hybrid realtime extractor.
 # Noted that: this should be less than or equals to realtimeExtractorPendingQueueCapacity
-# pipe_extractor_pending_queue_tablet_limit=64
+# pipe_extractor_pending_queue_tablet_limit=8
 
 # The connection timeout (in milliseconds) for the thrift client.
 # pipe_connector_timeout_ms=900000
@@ -999,7 +999,7 @@ cluster_name=defaultCluster
 # pipe_connector_retry_interval_ms=1000
 
 # The size of the pending queue for the PipeConnector to store the events.
-# pipe_connector_pending_queue_size=1024
+# pipe_connector_pending_queue_size=16
 
 # If the thrift RPC compression is enabled.
 # pipe_async_connector_rpc_thrift_compression_enabled=false
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 6405201d26d..f4f5905070b 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -158,14 +158,14 @@ public class CommonConfig {
 
   private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
   private int pipeExtractorMatcherCacheSize = 1024;
-  private int pipeExtractorPendingQueueCapacity = 128;
+  private int pipeExtractorPendingQueueCapacity = 16;
   private int pipeExtractorPendingQueueTabletLimit = pipeExtractorPendingQueueCapacity / 2;
-  private int pipeDataStructureTabletRowSize = 65536;
+  private int pipeDataStructureTabletRowSize = 16384;
 
   private long pipeConnectorTimeoutMs = 15 * 60 * 1000L; // 15 minutes
   private int pipeConnectorReadFileBufferSize = 8388608;
   private long pipeConnectorRetryIntervalMs = 1000L;
-  private int pipeConnectorPendingQueueSize = 1024;
+  private int pipeConnectorPendingQueueSize = 16;
 
   private boolean pipeAsyncConnectorRPCThriftCompressionEnabled = false;
   private int pipeAsyncConnectorSelectorNumber = 1;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
index 97be1e4be58..9c03edbc2b4 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
@@ -189,7 +189,7 @@ public class RecoverProgressIndex implements ProgressIndex {
     for (int i = 0; i < size; i++) {
       final int dataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
       final SimpleProgressIndex simpleProgressIndex =
-          SimpleProgressIndex.deserializeFrom(byteBuffer);
+          (SimpleProgressIndex) ProgressIndexType.deserializeFrom(byteBuffer);
       recoverProgressIndex.dataNodeId2LocalIndex.put(dataNodeId, simpleProgressIndex);
     }
     return recoverProgressIndex;
@@ -200,7 +200,8 @@ public class RecoverProgressIndex implements ProgressIndex {
     final int size = ReadWriteIOUtils.readInt(stream);
     for (int i = 0; i < size; i++) {
       final int dataNodeId = ReadWriteIOUtils.readInt(stream);
-      final SimpleProgressIndex simpleProgressIndex = SimpleProgressIndex.deserializeFrom(stream);
+      final SimpleProgressIndex simpleProgressIndex =
+          (SimpleProgressIndex) ProgressIndexType.deserializeFrom(stream);
       recoverProgressIndex.dataNodeId2LocalIndex.put(dataNodeId, simpleProgressIndex);
     }
     return recoverProgressIndex;
diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
index 89d3fcf989a..56b25867e21 100644
--- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
+++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
@@ -21,8 +21,11 @@ package org.apache.iotdb.commons.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
@@ -62,16 +65,31 @@ public class PipeMetaDeSerTest {
     PipeStaticMeta pipeStaticMeta1 = PipeStaticMeta.deserialize(staticByteBuffer);
     Assert.assertEquals(pipeStaticMeta, pipeStaticMeta1);
 
+    HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
+    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new SimpleProgressIndex(1, 2));
+    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new SimpleProgressIndex(2, 4));
+    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new IoTProgressIndex(3, 6L));
+
     PipeRuntimeMeta pipeRuntimeMeta =
         new PipeRuntimeMeta(
             new HashMap() {
               {
                 put(
-                    new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
+                    new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
                     new PipeTaskMeta(new MinimumProgressIndex(), 987));
                 put(
-                    new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
+                    new TConsensusGroupId(TConsensusGroupType.DataRegion, 234),
                     new PipeTaskMeta(new IoTProgressIndex(1, 2L), 789));
+                put(
+                    new TConsensusGroupId(TConsensusGroupType.DataRegion, 345),
+                    new PipeTaskMeta(new SimpleProgressIndex(3, 4), 789));
+                put(
+                    new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
+                    new PipeTaskMeta(hybridProgressIndex, 789));
+                put(
+                    new TConsensusGroupId(TConsensusGroupType.DataRegion, 567),
+                    new PipeTaskMeta(
+                        new RecoverProgressIndex(1, new SimpleProgressIndex(1, 9)), 123));
               }
             });
     ByteBuffer runtimeByteBuffer = pipeRuntimeMeta.serialize();