You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by vtygoss <vt...@126.com> on 2022/07/22 05:33:43 UTC

Re: Using RocksDBStateBackend and SSD to store states, applicationruns slower..

Hi, Jing Ge!


Thanks for your reply. As you suggested, i try to use FLASH_SSD_OPTIMIZED,  but the efficiency is significantly slower about 40%(40minutes, 60% output records) than options without FLASH_SSD_OPTIMIZED.






1. All rocksdb options 
- flink conf:
 state.backend: rocksdb
   state.backend.rocksdb.localdir: /mnt/disk12/hadoop/docker/yarn/local/flink (SSD)
   state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED (as you suggested and configurations below)
- code:
Because of the length of single record is larger and memory resource of yarn is sufficient, i use larger block size and write buffer size and number. But the only difference with preview test is "FLASH_SSD_OPTIMIZED".
```
private def optimizeRocksDB(factory: DefaultConfigurableOptionsFactory): DefaultConfigurableOptionsFactory ={
 // NOT SSD
 factory.setWriteBufferSize("128MB")
 factory.setMaxWriteBufferNumber(8)

 factory.setBlockSize("64KB")
 factory.setBlockCacheSize("256MB")
 factory.setMaxOpenFiles(-1)

 factory.setMaxSizeLevelBase("256MB")
 factory.setUseDynamicLevelSize(true)

 factory.setMinWriteBufferNumberToMerge(3)
 factory.setTargetFileSizeBase("128MB")
 factory.setMaxBackgroundThreads(8)
 factory

}
```


2. BackPressure info of case
- stream graph:     ...... Join ->           Not-Null-Enforcer ->            SinkMaterilizer  ->         bucket-assigner(apache hudi) -> hdfs writer 
- BackPressure        20~40%(stable)      70% or more(stable)            suddenly 2%, 30%, or 90%         0~10%(stable) 
- FlameGraph                                     Task#MailBox#park(99%)    rocks put and get || native checkpoint       rocks put and get     
 
In respect of BackPressure, i think that sink-materilizer is a stable bottom because operator not-null-enforcer is stable at backpressure, and the efficiency of bucket-assigner operator is a not stable bottom in order to sink-materilizer is not stable. 


In respect of FlameGraph, 
- for SinkMaterilizer operator, there are two performance problems of stack samples, one is native checkpoint of rocksdb(incremental), the other one is "put into & get from rocksdb", both problems shown as pictures blow.
- for bucket-assigner operator, only "put into & get from rocksdb" is performance problem as blow.    


So, did i do something wrong or miss some parameters? 


Best Regards! 
























在 2022年7月22日 04:10,Jing Ge<ji...@ververica.com> 写道:


Hi,


using FLASH_SSD_OPTIMIZED already sets the number of threads to 4. This optimization can improve the source throughput and reduce the delayed wrate rate. 


If this optimization didn't fix the back pressure, could you share more information about your job? Could you check the metric of the back pressured operator, e.g. check if it is caused by write-heavy or read-heavy tasks? You could try tuning rocksdb.writebuffer for write-heavy tasks.


On Thu, Jul 21, 2022 at 5:59 PM Yaroslav Tkachenko <ya...@goldsky.io> wrote:

Hi!


I'd try re-running the SSD test with the following config options:

state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED





On Thu, Jul 21, 2022 at 4:11 AM vtygoss <vt...@126.com> wrote:

Hi, community!


I am doing some performance tests based on my scene. 


1. Environment
- Flink: 1.13.5
- StateBackend: RocksDB, incremental
- user case: complex sql contains 7 joins and 2 aggregation, input data 30,000,000 records and output 60,000,000 records about 80GB. 
- resource: flink on yarn. JM 2G, one TM 24G(8G on-heap, 16G off-heap). 3 slots per TM
- only difference: different config 'state.backend.rocksdb.localdir', one SATA disk or one SSD disk.


2. rand write performance difference between SATA and SSD
   4.8M/s is archived using SATA, while 48.2M/s using SSD.
   ```
   fio -direct=1 -iodepth 64 -thread -rw=randwrite -ioengine=sync  -fsync=1 -runtime=300 -group_reporting -name=xxx -size=100G --allow_mounted_write=1 -bs=8k  -numjobs=64 -filename=/mnt/disk11/xx
   ``` 


3. In my use case, Flink SQL application finished in 41minutes using SATA, while 45minutes using SSD. 


Does this comparision suggest that the way to improve RocksDB performance by using SSD is not effective? 
The direct downstream of the BackPressure operator is HdfsSink, does that mean the best target to improve application performance is HDFS?


Thanks for your any replies or suggestions. 


Best Regards!

Re: Using RocksDBStateBackend and SSD to store states, applicationruns slower..

Posted by Roman Khachatryan <ro...@apache.org>.
Hi,

I double the concerns about representativeness of a single run of
~45minutes. A much longer run or more runs would probably give a different
picture.
Also, it would be easier I think to analyze the bottlenecks in the SATA
case first. If they are not related to storage then I wouldn't expect any
improvements as Hong pointed out.

Additionally, checkpointing might be faster with SSD; but more frequent
checkpoints might produce more SST layers - and that might slow down the
reads (and
IO in general because of compaction).
So probably it makes sense to compare without checkpointing first (after
making sure that storage IO is the bottleneck).

Regards,
Roman


On Fri, Jul 22, 2022 at 7:36 AM vtygoss <vt...@126.com> wrote:

> Hi, Jing Ge!
>
>
> Thanks for your reply. As you suggested, i try to use FLASH_SSD_OPTIMIZED,
>  but the efficiency is significantly slower about 40%(40minutes, 60% output
> records) than options without FLASH_SSD_OPTIMIZED.
>
>
>
>
> 1. All rocksdb options
>
> - flink conf:
>
>  state.backend: *rocksdb*
>
>    state.backend.*rocksdb*.localdir:
> /mnt/disk12/hadoop/docker/yarn/local/flink (SSD)
>
>    state.backend.*rocksdb*.predefined-options: FLASH_SSD_OPTIMIZED (as
> you suggested and configurations below)
>
> - code:
>
> Because of the length of single record is larger and memory resource of
> yarn is sufficient, i use larger block size and write buffer size and
> number. But the only difference with preview test is "FLASH_SSD_OPTIMIZED
> ".
>
> ```
> private def optimizeRocksDB(factory: DefaultConfigurableOptionsFactory):
> DefaultConfigurableOptionsFactory ={
> // NOT SSD
> factory.setWriteBufferSize("128MB")
> factory.setMaxWriteBufferNumber(8)
>
> factory.setBlockSize("64KB")
> factory.setBlockCacheSize("256MB")
> factory.setMaxOpenFiles(-1)
>
> factory.setMaxSizeLevelBase("256MB")
> factory.setUseDynamicLevelSize(true)
>
> factory.setMinWriteBufferNumberToMerge(3)
> factory.setTargetFileSizeBase("128MB")
> factory.setMaxBackgroundThreads(8)
> factory
>
> }
>
> ```
>
>
> 2. BackPressure info of case
>
> - stream graph:     ...... Join ->           Not-Null-Enforcer ->
>    SinkMaterilizer  ->         bucket-assigner(apache hudi) -> hdfs writer
>
> - BackPressure        20~40%(stable)      70% or more(stable)
>  suddenly 2%, 30%, or 90%         0~10%(stable)
>
> - FlameGraph                                     Task#MailBox#park(99%)
>  rocks put and get || native checkpoint       rocks put and get
>
>
>
> In respect of BackPressure, i think that sink-materilizer is a stable
> bottom because operator not-null-enforcer is stable at backpressure, and
> the efficiency of bucket-assigner operator is a not stable bottom in order
> to sink-materilizer is not stable.
>
>
> In respect of FlameGraph,
>
> - for SinkMaterilizer operator, there are two performance problems of
> stack samples, one is native checkpoint of rocksdb(incremental), the other
> one is "put into & get from rocksdb", both problems shown as pictures
> blow.
>
> - for bucket-assigner operator, only "put into & get from rocksdb" is
> performance problem as blow.
>
>
> So, did i do something wrong or miss some parameters?
>
>
> Best Regards!
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022年7月22日 04:10,Jing Ge<ji...@ververica.com> 写道:
>
> Hi,
>
> using FLASH_SSD_OPTIMIZED already sets the number of threads to 4. This
> optimization can improve the source throughput and reduce the delayed wrate
> rate.
>
> If this optimization didn't fix the back pressure, could you share more
> information about your job? Could you check the metric of the back
> pressured operator, e.g. check if it is caused by write-heavy or read-heavy
> tasks? You could try tuning rocksdb.writebuffer for write-heavy tasks.
>
> On Thu, Jul 21, 2022 at 5:59 PM Yaroslav Tkachenko <ya...@goldsky.io>
> wrote:
>
>> Hi!
>>
>> I'd try re-running the SSD test with the following config options:
>>
>> state.backend.rocksdb.thread.num: 4
>> state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
>>
>>
>> On Thu, Jul 21, 2022 at 4:11 AM vtygoss <vt...@126.com> wrote:
>>
>>> Hi, community!
>>>
>>>
>>> I am doing some performance tests based on my scene.
>>>
>>>
>>> 1. Environment
>>>
>>> - Flink: 1.13.5
>>>
>>> - StateBackend: RocksDB, incremental
>>>
>>> - user case: complex sql contains 7 joins and 2 aggregation, input data
>>> 30,000,000 records and output 60,000,000 records about 80GB.
>>>
>>> - resource: flink on yarn. JM 2G, one TM 24G(8G on-heap, 16G off-heap).
>>> 3 slots per TM
>>>
>>> - only difference: different config 'state.backend.rocksdb.localdir',
>>> one SATA disk or one SSD disk.
>>>
>>>
>>> 2. rand write performance difference between SATA and SSD
>>>
>>>    4.8M/s is archived using SATA, while 48.2M/s using SSD.
>>>
>>>    ```
>>>
>>>    fio -direct=1 -iodepth 64 -thread -rw=randwrite -ioengine=sync
>>>  -fsync=1 -runtime=300 -group_reporting -name=xxx -size=100G
>>> --allow_mounted_write=1 -bs=8k  -numjobs=64 -filename=/mnt/disk11/xx
>>>
>>>    ```
>>>
>>>
>>> 3. In my use case, Flink SQL application finished in 41minutes using
>>> SATA, while 45minutes using SSD.
>>>
>>>
>>> Does this comparision suggest that the way to improve RocksDB
>>> performance by using SSD is not effective?
>>>
>>> The direct downstream of the BackPressure operator is HdfsSink, does
>>> that mean the best target to improve application performance is HDFS?
>>>
>>>
>>> Thanks for your any replies or suggestions.
>>>
>>>
>>> Best Regards!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>