You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrey Bulgakov <ma...@andreiko.ru> on 2021/03/09 21:51:14 UTC

Extracting state keys for a very large RocksDB savepoint

Hi all,

I'm trying to use the State Processor API to extract all keys from a
RocksDB savepoint produced by an operator in a Flink streaming job into CSV
files.

The problem is that the storage size of the savepoint is 30TB and I'm
running into garbage collection issues no matter how much memory in
different proportions or CPU cores I allocate to task managers. (I tried
allocating up to 120GB and 16 cores to each task).

The same program and hardware configuration works with no problems for a
smaller savepoint (300GB), it's some sort of a scalability issue here.

At the beginning the tasks spend a couple hours in what I call "the
download phase". During that phase heap usage as indicated by metrics and
Flink UI is at about 10% and everything is going great.

But at certain point heap usage for tasks coming out of the download phase
starts to go up, climbs up to about 87% usage as indicated in Flink UI and
by the "tm.Status.JVM.Memory.Heap.Used" metric. At that point the heap
usage metric doesn't increase anymore and JVM starts spending a lot of time
collecting garbage and keeping all CPUs 100% loaded. After some time in
this mode the job crashes with "java.util.concurrent.TimeoutException:
Heartbeat of TaskManager with id container_1614821414188_0002_01_000035
timed out."

At all times the indicated managed memory usage is 0%. Which seems
suspicious since RocksDB is supposed to be using it?

Also, judging by the lack of an application metric I have in the state
processor operator, KeyedStateReaderFunction.readKey never gets called.

I would appreciate if somebody helped answering some of my questions or
suggested a way I could further diagnose/fix this:

1. Is it normal that this overwhelming garbage collection starts long
before reaching 100% heap usage? At the time it happens there 's usually
10-15GB of heap showing up as available.

2. Am I correct to assume that even in batch mode Flink implements memory
back pressure and is supposed to slow down processing/allocations when it's
low on available heap memory?

3. If #2 is true, is it possible that due to some misconfiguration Flink
considers more heap space to be available than there actually is and keeps
allocating even though there's no more heap?

4. As an alternative to #3, is it possible that there are some unaccounted
heap allocations that are not shown in the UI and by the metric and
therefore not taken into account by the memory back pressure mechanism?

Here's the minimal code example that demonstrates the issue:
https://gist.github.com/andreiko/94c675b4f04b40144b4cb4474b2f050f

I'm running this on Flink 12.2 (and many earlier versions, too) with the
following base configuration and parallelism of 80 (tried lowering that to
have more resources available, too):
https://gist.github.com/andreiko/305d77c23be605042b85d9d4eb63f025

I tried many things with no success:
- reducing parallelism and making more resources available to each task
manager
- enabling object reuse and modifying the tuple mapper to avoid extra tuple
allocations
- manipulating memory ratios to allocate more memory to be used as heap,
managed
- allocating 20% of memory for JVM overhead
- switching to G1GC garbage collector

Again, would appreciate any help with this.

-- 
With regards,
Andrey Bulgakov

Re: Extracting state keys for a very large RocksDB savepoint

Posted by Andrey Bulgakov <ma...@andreiko.ru>.
I guess there's no point in making it a KeyedProcessFunction since it's not
going to have access to context, timers or anything like that. So it can be
a simple InputFormat returning a DataSet of key and value tuples.

On Wed, Mar 17, 2021 at 8:37 AM Andrey Bulgakov <ma...@andreiko.ru> wrote:

> Hi Gordon,
>
> I think my current implementation is very specific and wouldn't be that
> valuable for the broader public.
> But I think there's a potential version of it that could also retrieve
> values from a savepoint in the same efficient way and that would be
> something that other people might need.
>
> I'm currently thinking about something similar to KeyedProcessFunction but
> taking a single state descriptor as a parameter instead of expecting a user
> to "register" some of them open(). The processElement() method would then
> be invoked with both key and value.
>
> One thing I'm not sure about are MapStateDescriptors because it stores
> compound keys separately and I'm not sure if they are stored in a sorted
> order and can be passed to processElement() as a group or should rather be
> passed separately.
>
> I'll experiment with this for a while and try to figure out what works.
> Please let me know if you have thoughts about this.
>
> On Sun, Mar 14, 2021 at 11:55 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Andrey,
>>
>> Perhaps the functionality you described is worth adding to the State
>> Processor API.
>> Your observation on how the library currently works is correct; basically
>> it
>> tries to restore the state backends as is.
>>
>> In you current implementation, do you see it worthwhile to try to add
>> this?
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
> With regards,
> Andrey Bulgakov
>


-- 
With regards,
Andrey Bulgakov

Re: Extracting state keys for a very large RocksDB savepoint

Posted by Andrey Bulgakov <ma...@andreiko.ru>.
Hi Gordon,

I think my current implementation is very specific and wouldn't be that
valuable for the broader public.
But I think there's a potential version of it that could also retrieve
values from a savepoint in the same efficient way and that would be
something that other people might need.

I'm currently thinking about something similar to KeyedProcessFunction but
taking a single state descriptor as a parameter instead of expecting a user
to "register" some of them open(). The processElement() method would then
be invoked with both key and value.

One thing I'm not sure about are MapStateDescriptors because it stores
compound keys separately and I'm not sure if they are stored in a sorted
order and can be passed to processElement() as a group or should rather be
passed separately.

I'll experiment with this for a while and try to figure out what works.
Please let me know if you have thoughts about this.

On Sun, Mar 14, 2021 at 11:55 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Andrey,
>
> Perhaps the functionality you described is worth adding to the State
> Processor API.
> Your observation on how the library currently works is correct; basically
> it
> tries to restore the state backends as is.
>
> In you current implementation, do you see it worthwhile to try to add this?
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
With regards,
Andrey Bulgakov

Re: Extracting state keys for a very large RocksDB savepoint

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Andrey,

Perhaps the functionality you described is worth adding to the State
Processor API.
Your observation on how the library currently works is correct; basically it
tries to restore the state backends as is.

In you current implementation, do you see it worthwhile to try to add this?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Extracting state keys for a very large RocksDB savepoint

Posted by Andrey Bulgakov <ma...@andreiko.ru>.
If anyone is interested, I reliazed that State Processor API was not the
right tool for this since it spends a lot of time rebuilding RocksDB tables
and then a lot of memory trying to read from it. All I really needed was
operator keys.

So I used SavepointLoader.loadSavepointMetadata to get KeyGroupsStateHandle
objects and built an InputFormat heavily based on the code I found
in RocksDBFullRestoreOperation.java.

It ended up working extremely quickly while keeping memory and CPU usage at
the minimum.

On Tue, Mar 9, 2021 at 1:51 PM Andrey Bulgakov <ma...@andreiko.ru> wrote:

> Hi all,
>
> I'm trying to use the State Processor API to extract all keys from a
> RocksDB savepoint produced by an operator in a Flink streaming job into CSV
> files.
>
> The problem is that the storage size of the savepoint is 30TB and I'm
> running into garbage collection issues no matter how much memory in
> different proportions or CPU cores I allocate to task managers. (I tried
> allocating up to 120GB and 16 cores to each task).
>
> The same program and hardware configuration works with no problems for a
> smaller savepoint (300GB), it's some sort of a scalability issue here.
>
> At the beginning the tasks spend a couple hours in what I call "the
> download phase". During that phase heap usage as indicated by metrics and
> Flink UI is at about 10% and everything is going great.
>
> But at certain point heap usage for tasks coming out of the download phase
> starts to go up, climbs up to about 87% usage as indicated in Flink UI and
> by the "tm.Status.JVM.Memory.Heap.Used" metric. At that point the heap
> usage metric doesn't increase anymore and JVM starts spending a lot of time
> collecting garbage and keeping all CPUs 100% loaded. After some time in
> this mode the job crashes with "java.util.concurrent.TimeoutException:
> Heartbeat of TaskManager with id container_1614821414188_0002_01_000035
> timed out."
>
> At all times the indicated managed memory usage is 0%. Which seems
> suspicious since RocksDB is supposed to be using it?
>
> Also, judging by the lack of an application metric I have in the state
> processor operator, KeyedStateReaderFunction.readKey never gets called.
>
> I would appreciate if somebody helped answering some of my questions or
> suggested a way I could further diagnose/fix this:
>
> 1. Is it normal that this overwhelming garbage collection starts long
> before reaching 100% heap usage? At the time it happens there 's usually
> 10-15GB of heap showing up as available.
>
> 2. Am I correct to assume that even in batch mode Flink implements memory
> back pressure and is supposed to slow down processing/allocations when it's
> low on available heap memory?
>
> 3. If #2 is true, is it possible that due to some misconfiguration Flink
> considers more heap space to be available than there actually is and keeps
> allocating even though there's no more heap?
>
> 4. As an alternative to #3, is it possible that there are some unaccounted
> heap allocations that are not shown in the UI and by the metric and
> therefore not taken into account by the memory back pressure mechanism?
>
> Here's the minimal code example that demonstrates the issue:
> https://gist.github.com/andreiko/94c675b4f04b40144b4cb4474b2f050f
>
> I'm running this on Flink 12.2 (and many earlier versions, too) with the
> following base configuration and parallelism of 80 (tried lowering that to
> have more resources available, too):
> https://gist.github.com/andreiko/305d77c23be605042b85d9d4eb63f025
>
> I tried many things with no success:
> - reducing parallelism and making more resources available to each task
> manager
> - enabling object reuse and modifying the tuple mapper to avoid extra
> tuple allocations
> - manipulating memory ratios to allocate more memory to be used as heap,
> managed
> - allocating 20% of memory for JVM overhead
> - switching to G1GC garbage collector
>
> Again, would appreciate any help with this.
>
> --
> With regards,
> Andrey Bulgakov
>


-- 
With regards,
Andrey Bulgakov