You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Daniel Hristov (Jira)" <ji...@apache.org> on 2022/03/18 14:17:00 UTC

[jira] [Created] (FLINK-26735) RMQSource queueing mechanism leads to OOM

Daniel Hristov created FLINK-26735:
--------------------------------------

             Summary: RMQSource queueing mechanism leads to OOM
                 Key: FLINK-26735
                 URL: https://issues.apache.org/jira/browse/FLINK-26735
             Project: Flink
          Issue Type: Bug
          Components: Connectors/ RabbitMQ
    Affects Versions: 1.14.4, 1.14.3
         Environment: Any environment; just let significantly more messages accumulate in the rabbit queue than can fit in taskmanager's heap. Upon starting the RMQSource will try to pull everything it can to the queue, running out of memory pretty fast. 
            Reporter: Daniel Hristov


RMQSource seems to use QueueingConsumer to get messages out of a Rabbit channel:

[https://github.com/apache/flink/blob/acef91e05ecaf986ac174f47749793ee12c783c0/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L259]

 

This consumer pushes all deliveries to a queue using a default capacity of 

Integer.MAX_VALUE. So if consumers are slow or the stream is (temporarily) slower than the producer the messages will keep accumulating in the queue, eventually leading to OOM. 

My suggestion to fix is: 

  - Add RMQSource constructors that set limited capacity of the internal queue

  - Change QueueingConsumer to call BlockingQueue::put (blocks until capacity is available, thus implementing some basic backpressure mechanism) rather than Queue::add (throws if capacity is exausted)

I've tested the above on my clone of the consumer and am happy to contribute it back if agreed upon. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)