You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/08/03 17:49:13 UTC
[iotdb] branch rel/1.2 updated: [IOTDB-6097] Pipe: Avoid subscrption running with the pattern option causing OOM & Fix de/ser of RecoverProgressIndex (#10767) (#10775)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 12d09e0e17e [IOTDB-6097] Pipe: Avoid subscrption running with the pattern option causing OOM & Fix de/ser of RecoverProgressIndex (#10767) (#10775)
12d09e0e17e is described below
commit 12d09e0e17e9aa8208293c6334acaeed403831c6
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri Aug 4 01:49:06 2023 +0800
[IOTDB-6097] Pipe: Avoid subscrption running with the pattern option causing OOM & Fix de/ser of RecoverProgressIndex (#10767) (#10775)
This commit fixes 2 issues:
* Subscrption running with the pattern option may cause OOM
How to reproduce:
1. execute sql:
```
create pipe test1
with extractor (
'extractor.history.enable'='false',
'extractor'='iotdb-extractor',
'extractor.realtime.mode'='log',
'extractor.pattern'='root'
)
with connector (
'connector'='iotdb-thrift-connector-v1',
'connector.node-urls'='127.0.0.1:6668'
);
start pipe test1;
```
2. run benchmark: 1 database, 10 devices, 10 measurements.
* java.lang.UnsupportedOperationException: Unsupported PipeRuntimeException type 0 caused by de/ser issue of RecoverProgressIndex
<img width="1194" alt="image" src="https://github.com/apache/iotdb/assets/30497621/d2d35ee7-293b-4594-92f3-fc10b2aa8313">
(cherry picked from commit f0f168249b27e0eb50665cb65b47201cf7db3671)
---
.../PipeRealtimeDataRegionHybridExtractor.java | 4 +-
.../PipeRealtimeDataRegionLogExtractor.java | 2 +-
.../PipeRealtimeDataRegionTsFileExtractor.java | 2 +-
.../pipe/task/connection/BlockingPendingQueue.java | 22 ++++++++--
.../pipe/task/connection/PipeEventCollector.java | 48 +++++++++++++++++-----
.../resources/conf/iotdb-common.properties | 8 ++--
.../apache/iotdb/commons/conf/CommonConfig.java | 6 +--
.../consensus/index/impl/RecoverProgressIndex.java | 5 ++-
.../iotdb/commons/pipe/PipeMetaDeSerTest.java | 22 +++++++++-
9 files changed, 89 insertions(+), 30 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index bbddc5d03b6..ffefb13daf0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -81,7 +81,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
}
if (!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)
- && !pendingQueue.offer(event)) {
+ && !pendingQueue.waitedOffer(event)) {
LOGGER.warn(
"extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
+ "has reached capacity, discard tablet event {}, current state {}",
@@ -101,7 +101,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
state ->
state.equals(TsFileEpoch.State.EMPTY) ? TsFileEpoch.State.USING_TSFILE : state);
- if (!pendingQueue.offer(event)) {
+ if (!pendingQueue.waitedOffer(event)) {
LOGGER.warn(
"extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
+ "has reached capacity, discard TsFile event {}, current state {}",
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 378af16857b..74ff0c533b1 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -51,7 +51,7 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
return;
}
- if (!pendingQueue.offer(event)) {
+ if (!pendingQueue.waitedOffer(event)) {
LOGGER.warn(
"extract: pending queue of PipeRealtimeDataRegionLogExtractor {} "
+ "has reached capacity, discard tablet event {}, current state {}",
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index ec351310c54..bab1ea2ec46 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -51,7 +51,7 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
return;
}
- if (!pendingQueue.offer(event)) {
+ if (!pendingQueue.waitedOffer(event)) {
LOGGER.warn(
"extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} "
+ "has reached capacity, discard TsFile event {}, current state {}",
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
index 2787adc4d0a..f81d301b6f2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
@@ -41,15 +41,29 @@ public abstract class BlockingPendingQueue<E extends Event> {
this.pendingQueue = pendingQueue;
}
- public boolean offer(E event) {
- boolean isAdded = false;
+ public boolean waitedOffer(E event) {
try {
- isAdded = pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
+ return pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.info("pending queue offer is interrupted.", e);
Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ public boolean directOffer(E event) {
+ return pendingQueue.offer(event);
+ }
+
+ public boolean put(E event) {
+ try {
+ pendingQueue.put(event);
+ return true;
+ } catch (InterruptedException e) {
+ LOGGER.info("pending queue put is interrupted.", e);
+ Thread.currentThread().interrupt();
+ return false;
}
- return isAdded;
}
public E directPoll() {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 498fc5aecfa..19ef3f6fd2f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -20,22 +20,22 @@
package org.apache.iotdb.db.pipe.task.connection;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.LinkedList;
import java.util.Queue;
public class PipeEventCollector implements EventCollector {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeEventCollector.class);
+
private final BoundedBlockingPendingQueue<Event> pendingQueue;
- // buffer queue is used to store events that are not offered to pending queue
- // because the pending queue is full. when pending queue is full, pending queue
- // will notify tasks to stop extracting events, and buffer queue will be used to store
- // events before tasks are stopped. when pending queue is not full and tasks are
- // notified by the pending queue to start extracting events, buffer queue will be used to store
- // events before events in buffer queue are offered to pending queue.
private final Queue<Event> bufferQueue;
public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
@@ -51,16 +51,42 @@ public class PipeEventCollector implements EventCollector {
while (!bufferQueue.isEmpty()) {
final Event bufferedEvent = bufferQueue.peek();
- if (pendingQueue.offer(bufferedEvent)) {
+ // Try to put already buffered events into pending queue, if pending queue is full, wait for
+ // pending queue to be available with timeout.
+ if (pendingQueue.waitedOffer(bufferedEvent)) {
bufferQueue.poll();
} else {
- bufferQueue.offer(event);
- return;
+ // If timeout, we judge whether the new event is a PipeRawTabletInsertionEvent. If it is,
+ // we wait for pending queue to be available without timeout until the pending queue is
+ // available. We don't put PipeRawTabletInsertionEvent into buffer queue, because it is
+ // memory consuming, holding too many PipeRawTabletInsertionEvent in buffer queue may cause
+ // OOM.
+ if (event instanceof PipeRawTabletInsertionEvent) {
+ if (pendingQueue.put(bufferedEvent)) {
+ bufferQueue.poll();
+ } else {
+ LOGGER.warn("interrupted when putting event into pending queue, event: {}", event);
+ bufferQueue.offer(event);
+ return;
+ }
+ } else {
+ bufferQueue.offer(event);
+ return;
+ }
}
}
- if (!pendingQueue.offer(event)) {
- bufferQueue.offer(event);
+ if (!pendingQueue.waitedOffer(event)) {
+ // PipeRawTabletInsertionEvent is memory consuming, so we should not put it into buffer queue
+ // when pending queue is full. Otherwise, it may cause OOM.
+ if (event instanceof PipeRawTabletInsertionEvent) {
+ if (!pendingQueue.put(event)) {
+ LOGGER.warn("interrupted when putting event into pending queue, event: {}", event);
+ bufferQueue.offer(event);
+ }
+ } else {
+ bufferQueue.offer(event);
+ }
}
}
}
diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 08cc77c3c35..3524973a789 100644
--- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -962,7 +962,7 @@ cluster_name=defaultCluster
# pipe_hardlink_wal_enabled=false
# The row size of tablets created in pipe transfer.
-# pipe_data_structure_tablet_row_size=65536
+# pipe_data_structure_tablet_row_size=16384
# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# pipe_subtask_executor_max_thread_num=5
@@ -983,11 +983,11 @@ cluster_name=defaultCluster
# pipe_extractor_matcher_cache_size=1024
# The capacity for the number of tablet events that can be stored in the pending queue of the hybrid realtime extractor.
-# pipe_extractor_pending_queue_capacity=128
+# pipe_extractor_pending_queue_capacity=16
# The limit for the number of tablet events that can be held in the pending queue of the hybrid realtime extractor.
# Noted that: this should be less than or equals to realtimeExtractorPendingQueueCapacity
-# pipe_extractor_pending_queue_tablet_limit=64
+# pipe_extractor_pending_queue_tablet_limit=8
# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000
@@ -999,7 +999,7 @@ cluster_name=defaultCluster
# pipe_connector_retry_interval_ms=1000
# The size of the pending queue for the PipeConnector to store the events.
-# pipe_connector_pending_queue_size=1024
+# pipe_connector_pending_queue_size=16
# If the thrift RPC compression is enabled.
# pipe_async_connector_rpc_thrift_compression_enabled=false
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 6405201d26d..f4f5905070b 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -158,14 +158,14 @@ public class CommonConfig {
private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
private int pipeExtractorMatcherCacheSize = 1024;
- private int pipeExtractorPendingQueueCapacity = 128;
+ private int pipeExtractorPendingQueueCapacity = 16;
private int pipeExtractorPendingQueueTabletLimit = pipeExtractorPendingQueueCapacity / 2;
- private int pipeDataStructureTabletRowSize = 65536;
+ private int pipeDataStructureTabletRowSize = 16384;
private long pipeConnectorTimeoutMs = 15 * 60 * 1000L; // 15 minutes
private int pipeConnectorReadFileBufferSize = 8388608;
private long pipeConnectorRetryIntervalMs = 1000L;
- private int pipeConnectorPendingQueueSize = 1024;
+ private int pipeConnectorPendingQueueSize = 16;
private boolean pipeAsyncConnectorRPCThriftCompressionEnabled = false;
private int pipeAsyncConnectorSelectorNumber = 1;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
index 97be1e4be58..9c03edbc2b4 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
@@ -189,7 +189,7 @@ public class RecoverProgressIndex implements ProgressIndex {
for (int i = 0; i < size; i++) {
final int dataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
final SimpleProgressIndex simpleProgressIndex =
- SimpleProgressIndex.deserializeFrom(byteBuffer);
+ (SimpleProgressIndex) ProgressIndexType.deserializeFrom(byteBuffer);
recoverProgressIndex.dataNodeId2LocalIndex.put(dataNodeId, simpleProgressIndex);
}
return recoverProgressIndex;
@@ -200,7 +200,8 @@ public class RecoverProgressIndex implements ProgressIndex {
final int size = ReadWriteIOUtils.readInt(stream);
for (int i = 0; i < size; i++) {
final int dataNodeId = ReadWriteIOUtils.readInt(stream);
- final SimpleProgressIndex simpleProgressIndex = SimpleProgressIndex.deserializeFrom(stream);
+ final SimpleProgressIndex simpleProgressIndex =
+ (SimpleProgressIndex) ProgressIndexType.deserializeFrom(stream);
recoverProgressIndex.dataNodeId2LocalIndex.put(dataNodeId, simpleProgressIndex);
}
return recoverProgressIndex;
diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
index 89d3fcf989a..56b25867e21 100644
--- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
+++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/PipeMetaDeSerTest.java
@@ -21,8 +21,11 @@ package org.apache.iotdb.commons.pipe;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
@@ -62,16 +65,31 @@ public class PipeMetaDeSerTest {
PipeStaticMeta pipeStaticMeta1 = PipeStaticMeta.deserialize(staticByteBuffer);
Assert.assertEquals(pipeStaticMeta, pipeStaticMeta1);
+ HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
+ hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new SimpleProgressIndex(1, 2));
+ hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new SimpleProgressIndex(2, 4));
+ hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new IoTProgressIndex(3, 6L));
+
PipeRuntimeMeta pipeRuntimeMeta =
new PipeRuntimeMeta(
new HashMap() {
{
put(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
new PipeTaskMeta(new MinimumProgressIndex(), 987));
put(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 234),
new PipeTaskMeta(new IoTProgressIndex(1, 2L), 789));
+ put(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 345),
+ new PipeTaskMeta(new SimpleProgressIndex(3, 4), 789));
+ put(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
+ new PipeTaskMeta(hybridProgressIndex, 789));
+ put(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 567),
+ new PipeTaskMeta(
+ new RecoverProgressIndex(1, new SimpleProgressIndex(1, 9)), 123));
}
});
ByteBuffer runtimeByteBuffer = pipeRuntimeMeta.serialize();