You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ran Zhang <ra...@pinterest.com> on 2020/01/30 01:38:51 UTC

FsStateBackend vs RocksDBStateBackend

Hi all,

We have a Flink app that uses a KeyedProcessFunction, and in the function
it requires a ValueState(of TreeSet) and the processElement method needs to
access and update it. We tried to use RocksDB as our stateBackend but the
performance is not good, and intuitively we think it was because of the
serialization / deserialization on each processElement call. Then we tried
to switch to use FsStateBackend (which keeps the in-flight data in the
TaskManager’s memory according to doc), and it could resolve the
performance issue. *So we want to understand better what are the tradeoffs
in choosing between these 2 stateBackend.* Our checkpoint size is 200 - 300
GB in stable state. For now we know one benefits of RocksDB is it supports
incremental checkpoint, but would love to know what else we are losing in
choosing FsStateBackend.

Thanks a lot!
Ran Zhang

Re: FsStateBackend vs RocksDBStateBackend

Posted by Chen Qin <qi...@gmail.com>.
Hi Robert,

Comments in line

> On Feb 28, 2020, at 2:51 AM, Robert Metzger <rm...@apache.org> wrote:
> 
> Sorry for the late reply.
> 
> There's not much you can do at the moment, as Flink needs to sync on the checkpoint barriers.
> There's something in the making for addressing the issue soon: https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints <https://cwiki.apache.org/confluence/display/FLINK/FLIP-76:+Unaligned+Checkpoints>
If I understand correctly, we need to make sure when snapshot state called, inflight records between barriers from different channels needs to be "materialized" (processed and pushed to down stream before snapshot called)

Be more specifically, if we honor watermark progression and operator snapshot (barriers aligned), drain out of order processed records before actually snapshot. Will it work correctly? Detail here https://github.com/apache/flink/pull/11267/files <https://github.com/apache/flink/pull/11267/files>

> Did you try out using the FsStateBackend?
It’s skewed key causing rocksdb update states slow as far as we know, Ran probably can share more in flinkforward 2020 :)
> If you are going to stick with rocks, I would recommend to understand what exactly causes the poor performance. I see the following areas:
> - serialization costs
> - disk / ssd speed
> - network speed (during checkpoint creation) (as Yu mentioned)
> - if you have asynchronous checkpoints enabled, they will also slow down the processing.
> 
> 
> On Sun, Feb 23, 2020 at 8:27 PM Chen Qin <cqin@pinterest.com <ma...@pinterest.com>> wrote:
> Just follow up on this thread, it accurately caused by key skew. Given single subtask is single threaded 5% of slow processing cause entire job back pressures on rocksdbstatebackend.
> 
> Robert,
> 
> What is blocking us enable multi threading in processor? I recall it has something todo with barrier and record in order. Can you share more insights on this?
> 
> Chen
> 
>> On Feb 21, 2020, at 4:56 AM, Robert Metzger <rmetzger@apache.org <ma...@apache.org>> wrote:
>> 
>> 
>> I would try the FsStateBackend in this scenario, as you have enough memory available.
>> 
>> On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang <ranzhang@pinterest.com <ma...@pinterest.com>> wrote:
>> Hi Gordon,
>> 
>> Thanks for your reply! Regarding state size - we are at 200-300gb but we have 120 parallelism which will make each task handle ~2 - 3 gb state. (when we submit the job we are setting tm memory to 15g.) In this scenario what will be the best fit for statebackend? 
>> 
>> Thanks,
>> Ran
>> 
>> On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai <tzulitai@apache.org <ma...@apache.org>> wrote:
>> Hi Ran,
>> 
>> On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <ranzhang@pinterest.com <ma...@pinterest.com>> wrote:
>> Hi all,
>> 
>> We have a Flink app that uses a KeyedProcessFunction, and in the function it requires a ValueState(of TreeSet) and the processElement method needs to access and update it. We tried to use RocksDB as our stateBackend but the performance is not good, and intuitively we think it was because of the serialization / deserialization on each processElement call.
>> 
>> As you have already pointed out, serialization behaviour is a major difference between the 2 state backends, and will directly impact performance due to the extra runtime overhead in RocksDB.
>> If you plan to continue using the RocksDB state backend, make sure to use MapState instead of ValueState where possible, since every access to the ValueState in the RocksDB backend requires serializing / deserializing the whole value.
>> For MapState, de-/serialization happens per K-V access. Whether or not this makes sense would of course depend on your state access pattern.
>>  
>> Then we tried to switch to use FsStateBackend (which keeps the in-flight data in the TaskManager’s memory according to doc), and it could resolve the performance issue. So we want to understand better what are the tradeoffs in choosing between these 2 stateBackend. Our checkpoint size is 200 - 300 GB in stable state. For now we know one benefits of RocksDB is it supports incremental checkpoint, but would love to know what else we are losing in choosing FsStateBackend.
>> 
>> As of now, feature-wise both backends support asynchronous snapshotting, state schema evolution, and access via the State Processor API.
>> In the end, the major factor for deciding between the two state backends would be your expected state size.
>> That being said, it could be possible in the future that savepoint formats for the backends are changed to be compatible, meaning that you will be able to switch between different backends upon restore [1].
>>  
>> 
>> Thanks a lot!
>> Ran Zhang
>> 
>> Cheers,
>> Gordon
>> 
>>  [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State <https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State>

Re: FsStateBackend vs RocksDBStateBackend

Posted by Robert Metzger <rm...@apache.org>.
Sorry for the late reply.

There's not much you can do at the moment, as Flink needs to sync on the
checkpoint barriers.
There's something in the making for addressing the issue soon:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints

Did you try out using the FsStateBackend?
If you are going to stick with rocks, I would recommend to understand what
exactly causes the poor performance. I see the following areas:
- serialization costs
- disk / ssd speed
- network speed (during checkpoint creation) (as Yu mentioned)
- if you have asynchronous checkpoints enabled, they will also slow down
the processing.


On Sun, Feb 23, 2020 at 8:27 PM Chen Qin <cq...@pinterest.com> wrote:

> Just follow up on this thread, it accurately caused by key skew. Given
> single subtask is single threaded 5% of slow processing cause entire job
> back pressures on rocksdbstatebackend.
>
> Robert,
>
> What is blocking us enable multi threading in processor? I recall it has
> something todo with barrier and record in order. Can you share more
> insights on this?
>
> Chen
>
> On Feb 21, 2020, at 4:56 AM, Robert Metzger <rm...@apache.org> wrote:
>
> 
> I would try the FsStateBackend in this scenario, as you have enough memory
> available.
>
> On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang <ra...@pinterest.com> wrote:
>
>> Hi Gordon,
>>
>> Thanks for your reply! Regarding state size - we are at 200-300gb but we
>> have 120 parallelism which will make each task handle ~2 - 3 gb state.
>> (when we submit the job we are setting tm memory to 15g.) In this scenario
>> what will be the best fit for statebackend?
>>
>> Thanks,
>> Ran
>>
>> On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
>> wrote:
>>
>>> Hi Ran,
>>>
>>> On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <ra...@pinterest.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> We have a Flink app that uses a KeyedProcessFunction, and in the
>>>> function it requires a ValueState(of TreeSet) and the processElement method
>>>> needs to access and update it. We tried to use RocksDB as our stateBackend
>>>> but the performance is not good, and intuitively we think it was because of
>>>> the serialization / deserialization on each processElement call.
>>>>
>>>
>>> As you have already pointed out, serialization behaviour is a major
>>> difference between the 2 state backends, and will directly impact
>>> performance due to the extra runtime overhead in RocksDB.
>>> If you plan to continue using the RocksDB state backend, make sure to
>>> use MapState instead of ValueState where possible, since every access to
>>> the ValueState in the RocksDB backend requires serializing / deserializing
>>> the whole value.
>>> For MapState, de-/serialization happens per K-V access. Whether or not
>>> this makes sense would of course depend on your state access pattern.
>>>
>>>
>>>> Then we tried to switch to use FsStateBackend (which keeps the
>>>> in-flight data in the TaskManager’s memory according to doc), and it could
>>>> resolve the performance issue. *So we want to understand better what
>>>> are the tradeoffs in choosing between these 2 stateBackend.* Our
>>>> checkpoint size is 200 - 300 GB in stable state. For now we know one
>>>> benefits of RocksDB is it supports incremental checkpoint, but would love
>>>> to know what else we are losing in choosing FsStateBackend.
>>>>
>>>
>>> As of now, feature-wise both backends support asynchronous snapshotting,
>>> state schema evolution, and access via the State Processor API.
>>> In the end, the major factor for deciding between the two state backends
>>> would be your expected state size.
>>> That being said, it could be possible in the future that savepoint
>>> formats for the backends are changed to be compatible, meaning that you
>>> will be able to switch between different backends upon restore [1].
>>>
>>>
>>>>
>>>> Thanks a lot!
>>>> Ran Zhang
>>>>
>>>
>>> Cheers,
>>> Gordon
>>>
>>>  [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State
>>>
>>

Re: FsStateBackend vs RocksDBStateBackend

Posted by Yu Li <ca...@gmail.com>.
Yes FsStateBackend would be the best fit for state access performance in
this case. Just a reminder that FsStateBackend will upload the full dataset
to DFS during checkpointing, so please watch the network bandwidth usage
and make sure it won't become a new bottleneck.

Best Regards,
Yu


On Fri, 21 Feb 2020 at 20:56, Robert Metzger <rm...@apache.org> wrote:

> I would try the FsStateBackend in this scenario, as you have enough memory
> available.
>
> On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang <ra...@pinterest.com> wrote:
>
>> Hi Gordon,
>>
>> Thanks for your reply! Regarding state size - we are at 200-300gb but we
>> have 120 parallelism which will make each task handle ~2 - 3 gb state.
>> (when we submit the job we are setting tm memory to 15g.) In this scenario
>> what will be the best fit for statebackend?
>>
>> Thanks,
>> Ran
>>
>> On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
>> wrote:
>>
>>> Hi Ran,
>>>
>>> On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <ra...@pinterest.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> We have a Flink app that uses a KeyedProcessFunction, and in the
>>>> function it requires a ValueState(of TreeSet) and the processElement method
>>>> needs to access and update it. We tried to use RocksDB as our stateBackend
>>>> but the performance is not good, and intuitively we think it was because of
>>>> the serialization / deserialization on each processElement call.
>>>>
>>>
>>> As you have already pointed out, serialization behaviour is a major
>>> difference between the 2 state backends, and will directly impact
>>> performance due to the extra runtime overhead in RocksDB.
>>> If you plan to continue using the RocksDB state backend, make sure to
>>> use MapState instead of ValueState where possible, since every access to
>>> the ValueState in the RocksDB backend requires serializing / deserializing
>>> the whole value.
>>> For MapState, de-/serialization happens per K-V access. Whether or not
>>> this makes sense would of course depend on your state access pattern.
>>>
>>>
>>>> Then we tried to switch to use FsStateBackend (which keeps the
>>>> in-flight data in the TaskManager’s memory according to doc), and it could
>>>> resolve the performance issue. *So we want to understand better what
>>>> are the tradeoffs in choosing between these 2 stateBackend.* Our
>>>> checkpoint size is 200 - 300 GB in stable state. For now we know one
>>>> benefits of RocksDB is it supports incremental checkpoint, but would love
>>>> to know what else we are losing in choosing FsStateBackend.
>>>>
>>>
>>> As of now, feature-wise both backends support asynchronous snapshotting,
>>> state schema evolution, and access via the State Processor API.
>>> In the end, the major factor for deciding between the two state backends
>>> would be your expected state size.
>>> That being said, it could be possible in the future that savepoint
>>> formats for the backends are changed to be compatible, meaning that you
>>> will be able to switch between different backends upon restore [1].
>>>
>>>
>>>>
>>>> Thanks a lot!
>>>> Ran Zhang
>>>>
>>>
>>> Cheers,
>>> Gordon
>>>
>>>  [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State
>>>
>>

Re: FsStateBackend vs RocksDBStateBackend

Posted by Robert Metzger <rm...@apache.org>.
I would try the FsStateBackend in this scenario, as you have enough memory
available.

On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang <ra...@pinterest.com> wrote:

> Hi Gordon,
>
> Thanks for your reply! Regarding state size - we are at 200-300gb but we
> have 120 parallelism which will make each task handle ~2 - 3 gb state.
> (when we submit the job we are setting tm memory to 15g.) In this scenario
> what will be the best fit for statebackend?
>
> Thanks,
> Ran
>
> On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Ran,
>>
>> On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <ra...@pinterest.com> wrote:
>>
>>> Hi all,
>>>
>>> We have a Flink app that uses a KeyedProcessFunction, and in the
>>> function it requires a ValueState(of TreeSet) and the processElement method
>>> needs to access and update it. We tried to use RocksDB as our stateBackend
>>> but the performance is not good, and intuitively we think it was because of
>>> the serialization / deserialization on each processElement call.
>>>
>>
>> As you have already pointed out, serialization behaviour is a major
>> difference between the 2 state backends, and will directly impact
>> performance due to the extra runtime overhead in RocksDB.
>> If you plan to continue using the RocksDB state backend, make sure to use
>> MapState instead of ValueState where possible, since every access to the
>> ValueState in the RocksDB backend requires serializing / deserializing the
>> whole value.
>> For MapState, de-/serialization happens per K-V access. Whether or not
>> this makes sense would of course depend on your state access pattern.
>>
>>
>>> Then we tried to switch to use FsStateBackend (which keeps the in-flight
>>> data in the TaskManager’s memory according to doc), and it could resolve
>>> the performance issue. *So we want to understand better what are the
>>> tradeoffs in choosing between these 2 stateBackend.* Our checkpoint
>>> size is 200 - 300 GB in stable state. For now we know one benefits of
>>> RocksDB is it supports incremental checkpoint, but would love to know what
>>> else we are losing in choosing FsStateBackend.
>>>
>>
>> As of now, feature-wise both backends support asynchronous snapshotting,
>> state schema evolution, and access via the State Processor API.
>> In the end, the major factor for deciding between the two state backends
>> would be your expected state size.
>> That being said, it could be possible in the future that savepoint
>> formats for the backends are changed to be compatible, meaning that you
>> will be able to switch between different backends upon restore [1].
>>
>>
>>>
>>> Thanks a lot!
>>> Ran Zhang
>>>
>>
>> Cheers,
>> Gordon
>>
>>  [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State
>>
>

Re: FsStateBackend vs RocksDBStateBackend

Posted by Ran Zhang <ra...@pinterest.com>.
Hi Gordon,

Thanks for your reply! Regarding state size - we are at 200-300gb but we
have 120 parallelism which will make each task handle ~2 - 3 gb state.
(when we submit the job we are setting tm memory to 15g.) In this scenario
what will be the best fit for statebackend?

Thanks,
Ran

On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Ran,
>
> On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <ra...@pinterest.com> wrote:
>
>> Hi all,
>>
>> We have a Flink app that uses a KeyedProcessFunction, and in the function
>> it requires a ValueState(of TreeSet) and the processElement method needs to
>> access and update it. We tried to use RocksDB as our stateBackend but the
>> performance is not good, and intuitively we think it was because of the
>> serialization / deserialization on each processElement call.
>>
>
> As you have already pointed out, serialization behaviour is a major
> difference between the 2 state backends, and will directly impact
> performance due to the extra runtime overhead in RocksDB.
> If you plan to continue using the RocksDB state backend, make sure to use
> MapState instead of ValueState where possible, since every access to the
> ValueState in the RocksDB backend requires serializing / deserializing the
> whole value.
> For MapState, de-/serialization happens per K-V access. Whether or not
> this makes sense would of course depend on your state access pattern.
>
>
>> Then we tried to switch to use FsStateBackend (which keeps the in-flight
>> data in the TaskManager’s memory according to doc), and it could resolve
>> the performance issue. *So we want to understand better what are the
>> tradeoffs in choosing between these 2 stateBackend.* Our checkpoint size
>> is 200 - 300 GB in stable state. For now we know one benefits of RocksDB is
>> it supports incremental checkpoint, but would love to know what else we are
>> losing in choosing FsStateBackend.
>>
>
> As of now, feature-wise both backends support asynchronous snapshotting,
> state schema evolution, and access via the State Processor API.
> In the end, the major factor for deciding between the two state backends
> would be your expected state size.
> That being said, it could be possible in the future that savepoint formats
> for the backends are changed to be compatible, meaning that you will be
> able to switch between different backends upon restore [1].
>
>
>>
>> Thanks a lot!
>> Ran Zhang
>>
>
> Cheers,
> Gordon
>
>  [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State
>

Re: FsStateBackend vs RocksDBStateBackend

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Ran,

On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <ra...@pinterest.com> wrote:

> Hi all,
>
> We have a Flink app that uses a KeyedProcessFunction, and in the function
> it requires a ValueState(of TreeSet) and the processElement method needs to
> access and update it. We tried to use RocksDB as our stateBackend but the
> performance is not good, and intuitively we think it was because of the
> serialization / deserialization on each processElement call.
>

As you have already pointed out, serialization behaviour is a major
difference between the 2 state backends, and will directly impact
performance due to the extra runtime overhead in RocksDB.
If you plan to continue using the RocksDB state backend, make sure to use
MapState instead of ValueState where possible, since every access to the
ValueState in the RocksDB backend requires serializing / deserializing the
whole value.
For MapState, de-/serialization happens per K-V access. Whether or not this
makes sense would of course depend on your state access pattern.


> Then we tried to switch to use FsStateBackend (which keeps the in-flight
> data in the TaskManager’s memory according to doc), and it could resolve
> the performance issue. *So we want to understand better what are the
> tradeoffs in choosing between these 2 stateBackend.* Our checkpoint size
> is 200 - 300 GB in stable state. For now we know one benefits of RocksDB is
> it supports incremental checkpoint, but would love to know what else we are
> losing in choosing FsStateBackend.
>

As of now, feature-wise both backends support asynchronous snapshotting,
state schema evolution, and access via the State Processor API.
In the end, the major factor for deciding between the two state backends
would be your expected state size.
That being said, it could be possible in the future that savepoint formats
for the backends are changed to be compatible, meaning that you will be
able to switch between different backends upon restore [1].


>
> Thanks a lot!
> Ran Zhang
>

Cheers,
Gordon

 [1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State