You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "kyledong@connect.hku.hk" <ky...@connect.hku.hk> on 2018/07/20 12:20:15 UTC

Idea for preventing OOM in continuous queries of Flink SQL by implementing a "spillable" StateTable

Hi everyone,
 
I am currently dealing with an OOM issue caused by continuous queries in Flink SQL (for example, a GROUP BY clause without a window). After looking into Flink's Runtime & Core module, I found that as time goes by, StateTables in the HeapKeyedStateBackend grows larger and larger, because there are too many potential keys to be put into the memory (and GC is taking up most of the CPU time, usually for 20 seconds and more), eventually crashing the job or even the entire JobManager.
 
Therefore, I am now thinking of implementing a "spillable" StateTable that could spill the keys together with values into disk (or StateBackends like RocksDB) when heap memory is going to deplete. And also when memory pressure alleviates, the entries could be put back. Therefore, contrary to using RocksDB alone, Flink's throughput would not be affected if there is no need for a "spill" (when heap memory is enough).
 
I am not sure if the Flink community has some plans in dealing with the OOM issue caused by large number of keys & states in the heap (except for using RocksDB alone because the throughput would be rather slow when heap memory is enough), and whether my plan has any serious flaws, making it not worth doing, like making checkpointing process harder to be consistent, or greatly reducing the throughput because of the additional lookup cost, etc.?

Hope to get any feedbacks, and thank you in advance, especially Fabian who recommended me to this mailing list : )
 
Sincerely,
Kyle



kyledong@connect.hku.hk

Re: Idea for preventing OOM in continuous queries of Flink SQL by implementing a "spillable" StateTable

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

> 
> I am currently dealing with an OOM issue caused by continuous queries in Flink SQL (for example, a GROUP BY clause without a window). After looking into Flink's Runtime & Core module, I found that as time goes by, StateTables in the HeapKeyedStateBackend grows larger and larger, because there are too many potential keys to be put into the memory (and GC is taking up most of the CPU time, usually for 20 seconds and more), eventually crashing the job or even the entire JobManager.

did you consider increasing the parallelism to solve the problem? If your machine has enough cores and memory, running more JVMs can have the benefit of smaller per-JVM heap sizes. I think also that GC tuning and future progress in GC algorithms like Shenandoah can potentially bring improvements for this problem.

> 
> Therefore, I am now thinking of implementing a "spillable" StateTable that could spill the keys together with values into disk (or StateBackends like RocksDB) when heap memory is going to deplete. And also when memory pressure alleviates, the entries could be put back. Therefore, contrary to using RocksDB alone, Flink's throughput would not be affected if there is no need for a "spill" (when heap memory is enough
> 
> I am not sure if the Flink community has some plans in dealing with the OOM issue caused by large number of keys & states in the heap (except for using RocksDB alone because the throughput would be rather slow when heap memory is enough), and whether my plan has any serious flaws, making it not worth doing, like making checkpointing process harder to be consistent, or greatly reducing the throughput because of the additional lookup cost, etc.?
> 

We were also thinking about this, but the feature was never important/requested enough to make it on the shortlist for a release. I think it is not just as easy as „spilling“ makes it sound. For example, how does the spilling deal with updates? Eventually you might need to write them back to disk and you should have a strategy that will not kill you with random I/O. Probably you might end up implementing something similar to RocksDB, which is why I could see an argument for rather wrapping the RocksDB backend with an in-memory cache than enhancing the heap backend with spilling. You also need to consider the use-case and key-distribution pattern. Does it have hot/cold keys? Or maybe not an a lot of keys competing for a small cache? What cache replacement strategy makes sense, maybe some LRU-K or Clock?

Another aspect is also when and how to sync between the cache/RocksDB or heap/spilling partition (e.g. write-though vs write-back), so that the checkpoint is consistent and at the same time, we don’t have a blocking behaviour when a checkpoint starts. E.g. write-through will still keep a serializer overhead in your main processing loop or some background thread (that might lead to some latency/blocking if it cannot keep up), write-back might stall checkpoints because changes need to be published first.

So, yes there are ideas floating around, but it is not as trivial as it might look at first and it never made it into an important-enough feature because the alternatives (scaling out, Rocks) seem good enough for most people.

Best,
Stefan