You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by David Causse <dc...@wikimedia.org> on 2021/09/08 12:44:55 UTC

State processor API very slow reading a keyed state with RocksDB

Hi,

I'm investigating why a job we use to inspect a flink state is a lot slower
than the bootstrap job used to generate it.

I use RocksdbDB with a simple keyed value state mapping a string key to a
long value. Generating the bootstrap state from a CSV file with 100M
entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
allowed). But another job that does the opposite (converts this state into
a CSV file) takes several hours. I would have expected these two job
runtimes to be in the same ballpark.

I wrote a simple test case[1] to reproduce the problem. This program has 3
jobs:
- CreateState: generate a keyed state (string->long) using the state
processor api
- StreamJob: reads all the keys using a StreamingExecutionEnvironment
- ReadState: reads all the keys using the state processor api

Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
StreamJob are done in less than a minute.
ReadState is much slower (> 30minutes) on my system. The RocksDB state
appears to be restored relatively quickly but after that the slots are
performing at very different speeds. Some slots finish quickly but some
others struggle to advance.
Looking at the thread dumps I always see threads in org.rocksdb.RocksDB.get:

"DataSource (at readKeyedState(ExistingSavepoint.java:314)
(org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371
RUNNABLE
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:2084)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)

It seems suspiciously slow to me and I'm wondering if I'm missing something
in the way the state processor api works.

Thanks for your help!

David.

1: https://github.com/nomoa/rocksdb-state-processor-test

Re: State processor API very slow reading a keyed state with RocksDB

Posted by David Causse <dc...@wikimedia.org>.
Thank you all for the great insights and suggestions!

I understand that the underlying components used by the state processor api
are sufficiently different that it may explain this slowness and this
behavior is not something caused by the way we use this API.

David.

On Fri, Sep 10, 2021 at 5:27 AM Yun Tang <my...@live.com> wrote:

> Hi David,
>
> I think Seth had shared some useful information.
>
> If you want to know what happened within RocksDB when you're reading, you
> can leverage async-profiler [1] to catch the RocksDB stacks and I guess
> that index block might be evicted too frequently during your read. And we
> could use new read option which disable fillCache [2] to speedup bulk scan
> in the future to help improve the performance.
>
>
> Best
> Yun Tang
>
> [1] https://github.com/jvm-profiling-tools/async-profiler
> [2]
> https://javadoc.io/doc/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/ReadOptions.html#setFillCache(boolean)
> ------------------------------
> *From:* Seth Wiesman <sj...@gmail.com>
> *Sent:* Friday, September 10, 2021 0:58
> *To:* David Causse <dc...@wikimedia.org>; user <us...@flink.apache.org>
> *Cc:* Piotr Nowojski <pn...@apache.org>
> *Subject:* Re: State processor API very slow reading a keyed state with
> RocksDB
>
> Hi David,
>
> I was also able to reproduce the behavior, but was able to get
> significant performance improvements by reducing the number of slots on
> each TM to 1.
>
> My suspicion, as Piotr alluded to, has to do with the different runtime
> execution of DataSet over DataStream. In particular, Flink's DataStream
> operators are aware of the resource requirements of the state backend and
> include RocksDB in its internal memory configurations. In the state
> processor api, the underlying input format is a blackbox.
>
> Another thing to know is that when running multiple RocksDB instances
> within the same JVM, you are actually running a single native process with
> multiple logical instances. I _think_ we are seeing contention amongst the
> logical RocksDB instances.
>
> Even with one slot, it is not as fast as I would like and will need to
> continue investigating. If my suspicion for the slowness is correct, we
> will need to migrate to the new Source API and improve this as part of
> DataStream integration. This migration is something we'd like to do
> regardless, but I don't have a timeline to share.
>
> *Aside: Why is writing still relatively fast? *
>
> Even with these factors accounted for, I do still expect writing to be
> faster than reading. This is due to both how RocksDB internal data
> structures work, along with some peculiarities of how to state processor
> API has to perform reads.
>
> 1. RocksDB internally uses a data structure called a log structured merge
> tree (or LSM). This means writes are always implemented as appends, so
> there is no seek required. Additionally, writes go into an in-memory data
> structure called a MemTable that is flushed to disk asynchronously.
> Because there may be multiple entries for a given key, RocksDB needs to
> search for the most recent value and potentially read from disk. This may
> be alleviated by enabling bloom filters but does have memory costs.
>
> 2. RocksDB is a key value store, so Flink represents each registered state
> (ValueState, ListState, etc) as its own column family (table). A key only
> exists in a table if it has a non-null value. This means not all keys exist
> in all column families for a given operator. The state-proc-api wants to
> make it appear as if each operator is composed of a single table with
> multiple columns. To do this, we perform a full table scan on one column
> family and then do point lookups of that key on the others. However, we
> still need to find the keys that may only exist in other tables. The trick
> we perform is to delete keys from rocksDB after each read, so we can do
> full table scans on all column families but never see any duplicates. This
> means the reader is performing multiple reads and writes on every call to
> `readKey` and is more expensive than it may appear.
>
> Seth
>
>
> On Thu, Sep 9, 2021 at 1:48 AM Piotr Nowojski <pn...@apache.org>
> wrote:
>
> Hi David,
>
> I can confirm that I'm able to reproduce this behaviour. I've tried
> profiling/flame graphs and I was not able to make much sense out of those
> results. There are no IO/Memory bottlenecks that I could notice, it looks
> indeed like the Job is stuck inside RocksDB itself. This might be an issue
> with for example memory configuration. Streaming jobs and State Processor
> API are running in very different environments as the latter one is using
> DataSet API under the hood, so maybe that can explain this? However I'm no
> expert in neither DataSet API nor the RocksDB, so it's hard for me to make
> progress here.
>
> Maybe someone else can help here?
>
> Piotrek
>
>
> śr., 8 wrz 2021 o 14:45 David Causse <dc...@wikimedia.org> napisał(a):
>
> Hi,
>
> I'm investigating why a job we use to inspect a flink state is a lot
> slower than the bootstrap job used to generate it.
>
> I use RocksdbDB with a simple keyed value state mapping a string key to a
> long value. Generating the bootstrap state from a CSV file with 100M
> entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
> allowed). But another job that does the opposite (converts this state into
> a CSV file) takes several hours. I would have expected these two job
> runtimes to be in the same ballpark.
>
> I wrote a simple test case[1] to reproduce the problem. This program has 3
> jobs:
> - CreateState: generate a keyed state (string->long) using the state
> processor api
> - StreamJob: reads all the keys using a StreamingExecutionEnvironment
> - ReadState: reads all the keys using the state processor api
>
> Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
> StreamJob are done in less than a minute.
> ReadState is much slower (> 30minutes) on my system. The RocksDB state
> appears to be restored relatively quickly but after that the slots are
> performing at very different speeds. Some slots finish quickly but some
> others struggle to advance.
> Looking at the thread dumps I always see threads in
> org.rocksdb.RocksDB.get:
>
> "DataSource (at readKeyedState(ExistingSavepoint.java:314)
> (org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371
> RUNNABLE
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:2084)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)
>
> It seems suspiciously slow to me and I'm wondering if I'm missing
> something in the way the state processor api works.
>
> Thanks for your help!
>
> David.
>
> 1: https://github.com/nomoa/rocksdb-state-processor-test
>
>

Re: State processor API very slow reading a keyed state with RocksDB

Posted by Yun Tang <my...@live.com>.
Hi David,

I think Seth had shared some useful information.

If you want to know what happened within RocksDB when you're reading, you can leverage async-profiler [1] to catch the RocksDB stacks and I guess that index block might be evicted too frequently during your read. And we could use new read option which disable fillCache [2] to speedup bulk scan in the future to help improve the performance.


Best
Yun Tang

[1] https://github.com/jvm-profiling-tools/async-profiler
[2] https://javadoc.io/doc/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/ReadOptions.html#setFillCache(boolean)
________________________________
From: Seth Wiesman <sj...@gmail.com>
Sent: Friday, September 10, 2021 0:58
To: David Causse <dc...@wikimedia.org>; user <us...@flink.apache.org>
Cc: Piotr Nowojski <pn...@apache.org>
Subject: Re: State processor API very slow reading a keyed state with RocksDB

Hi David,

I was also able to reproduce the behavior, but was able to get significant performance improvements by reducing the number of slots on each TM to 1.

My suspicion, as Piotr alluded to, has to do with the different runtime execution of DataSet over DataStream. In particular, Flink's DataStream operators are aware of the resource requirements of the state backend and include RocksDB in its internal memory configurations. In the state processor api, the underlying input format is a blackbox.

Another thing to know is that when running multiple RocksDB instances within the same JVM, you are actually running a single native process with multiple logical instances. I _think_ we are seeing contention amongst the logical RocksDB instances.

Even with one slot, it is not as fast as I would like and will need to continue investigating. If my suspicion for the slowness is correct, we will need to migrate to the new Source API and improve this as part of DataStream integration. This migration is something we'd like to do regardless, but I don't have a timeline to share.

Aside: Why is writing still relatively fast?

Even with these factors accounted for, I do still expect writing to be faster than reading. This is due to both how RocksDB internal data structures work, along with some peculiarities of how to state processor API has to perform reads.

1. RocksDB internally uses a data structure called a log structured merge tree (or LSM). This means writes are always implemented as appends, so there is no seek required. Additionally, writes go into an in-memory data structure called a MemTable that is flushed to disk asynchronously.  Because there may be multiple entries for a given key, RocksDB needs to search for the most recent value and potentially read from disk. This may be alleviated by enabling bloom filters but does have memory costs.

2. RocksDB is a key value store, so Flink represents each registered state (ValueState, ListState, etc) as its own column family (table). A key only exists in a table if it has a non-null value. This means not all keys exist in all column families for a given operator. The state-proc-api wants to make it appear as if each operator is composed of a single table with multiple columns. To do this, we perform a full table scan on one column family and then do point lookups of that key on the others. However, we still need to find the keys that may only exist in other tables. The trick we perform is to delete keys from rocksDB after each read, so we can do full table scans on all column families but never see any duplicates. This means the reader is performing multiple reads and writes on every call to `readKey` and is more expensive than it may appear.

Seth


On Thu, Sep 9, 2021 at 1:48 AM Piotr Nowojski <pn...@apache.org>> wrote:
Hi David,

I can confirm that I'm able to reproduce this behaviour. I've tried profiling/flame graphs and I was not able to make much sense out of those results. There are no IO/Memory bottlenecks that I could notice, it looks indeed like the Job is stuck inside RocksDB itself. This might be an issue with for example memory configuration. Streaming jobs and State Processor API are running in very different environments as the latter one is using DataSet API under the hood, so maybe that can explain this? However I'm no expert in neither DataSet API nor the RocksDB, so it's hard for me to make progress here.

Maybe someone else can help here?

Piotrek


śr., 8 wrz 2021 o 14:45 David Causse <dc...@wikimedia.org>> napisał(a):
Hi,

I'm investigating why a job we use to inspect a flink state is a lot slower than the bootstrap job used to generate it.

I use RocksdbDB with a simple keyed value state mapping a string key to a long value. Generating the bootstrap state from a CSV file with 100M entries takes a couple minutes over 12 slots spread over 3 TM (4Gb allowed). But another job that does the opposite (converts this state into a CSV file) takes several hours. I would have expected these two job runtimes to be in the same ballpark.

I wrote a simple test case[1] to reproduce the problem. This program has 3 jobs:
- CreateState: generate a keyed state (string->long) using the state processor api
- StreamJob: reads all the keys using a StreamingExecutionEnvironment
- ReadState: reads all the keys using the state processor api

Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState & StreamJob are done in less than a minute.
ReadState is much slower (> 30minutes) on my system. The RocksDB state appears to be restored relatively quickly but after that the slots are performing at very different speeds. Some slots finish quickly but some others struggle to advance.
Looking at the thread dumps I always see threads in org.rocksdb.RocksDB.get:

"DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371 RUNNABLE
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:2084)
at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
at org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
at org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
at org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)

It seems suspiciously slow to me and I'm wondering if I'm missing something in the way the state processor api works.

Thanks for your help!

David.

1: https://github.com/nomoa/rocksdb-state-processor-test

Re: State processor API very slow reading a keyed state with RocksDB

Posted by Seth Wiesman <sj...@gmail.com>.
Hi David,

I was also able to reproduce the behavior, but was able to get
significant performance improvements by reducing the number of slots on
each TM to 1.

My suspicion, as Piotr alluded to, has to do with the different runtime
execution of DataSet over DataStream. In particular, Flink's DataStream
operators are aware of the resource requirements of the state backend and
include RocksDB in its internal memory configurations. In the state
processor api, the underlying input format is a blackbox.

Another thing to know is that when running multiple RocksDB instances
within the same JVM, you are actually running a single native process with
multiple logical instances. I _think_ we are seeing contention amongst the
logical RocksDB instances.

Even with one slot, it is not as fast as I would like and will need to
continue investigating. If my suspicion for the slowness is correct, we
will need to migrate to the new Source API and improve this as part of
DataStream integration. This migration is something we'd like to do
regardless, but I don't have a timeline to share.

*Aside: Why is writing still relatively fast? *

Even with these factors accounted for, I do still expect writing to be
faster than reading. This is due to both how RocksDB internal data
structures work, along with some peculiarities of how to state processor
API has to perform reads.

1. RocksDB internally uses a data structure called a log structured merge
tree (or LSM). This means writes are always implemented as appends, so
there is no seek required. Additionally, writes go into an in-memory data
structure called a MemTable that is flushed to disk asynchronously.
Because there may be multiple entries for a given key, RocksDB needs to
search for the most recent value and potentially read from disk. This may
be alleviated by enabling bloom filters but does have memory costs.

2. RocksDB is a key value store, so Flink represents each registered state
(ValueState, ListState, etc) as its own column family (table). A key only
exists in a table if it has a non-null value. This means not all keys exist
in all column families for a given operator. The state-proc-api wants to
make it appear as if each operator is composed of a single table with
multiple columns. To do this, we perform a full table scan on one column
family and then do point lookups of that key on the others. However, we
still need to find the keys that may only exist in other tables. The trick
we perform is to delete keys from rocksDB after each read, so we can do
full table scans on all column families but never see any duplicates. This
means the reader is performing multiple reads and writes on every call to
`readKey` and is more expensive than it may appear.

Seth


On Thu, Sep 9, 2021 at 1:48 AM Piotr Nowojski <pn...@apache.org> wrote:

> Hi David,
>
> I can confirm that I'm able to reproduce this behaviour. I've tried
> profiling/flame graphs and I was not able to make much sense out of those
> results. There are no IO/Memory bottlenecks that I could notice, it looks
> indeed like the Job is stuck inside RocksDB itself. This might be an issue
> with for example memory configuration. Streaming jobs and State Processor
> API are running in very different environments as the latter one is using
> DataSet API under the hood, so maybe that can explain this? However I'm no
> expert in neither DataSet API nor the RocksDB, so it's hard for me to make
> progress here.
>
> Maybe someone else can help here?
>
> Piotrek
>
>
> śr., 8 wrz 2021 o 14:45 David Causse <dc...@wikimedia.org> napisał(a):
>
>> Hi,
>>
>> I'm investigating why a job we use to inspect a flink state is a lot
>> slower than the bootstrap job used to generate it.
>>
>> I use RocksdbDB with a simple keyed value state mapping a string key to a
>> long value. Generating the bootstrap state from a CSV file with 100M
>> entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
>> allowed). But another job that does the opposite (converts this state into
>> a CSV file) takes several hours. I would have expected these two job
>> runtimes to be in the same ballpark.
>>
>> I wrote a simple test case[1] to reproduce the problem. This program has
>> 3 jobs:
>> - CreateState: generate a keyed state (string->long) using the state
>> processor api
>> - StreamJob: reads all the keys using a StreamingExecutionEnvironment
>> - ReadState: reads all the keys using the state processor api
>>
>> Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
>> StreamJob are done in less than a minute.
>> ReadState is much slower (> 30minutes) on my system. The RocksDB state
>> appears to be restored relatively quickly but after that the slots are
>> performing at very different speeds. Some slots finish quickly but some
>> others struggle to advance.
>> Looking at the thread dumps I always see threads in
>> org.rocksdb.RocksDB.get:
>>
>> "DataSource (at readKeyedState(ExistingSavepoint.java:314)
>> (org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371
>> RUNNABLE
>> at org.rocksdb.RocksDB.get(Native Method)
>> at org.rocksdb.RocksDB.get(RocksDB.java:2084)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
>> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
>> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
>> at
>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
>> at
>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
>> at
>> org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)
>>
>> It seems suspiciously slow to me and I'm wondering if I'm missing
>> something in the way the state processor api works.
>>
>> Thanks for your help!
>>
>> David.
>>
>> 1: https://github.com/nomoa/rocksdb-state-processor-test
>>
>

Re: State processor API very slow reading a keyed state with RocksDB

Posted by Piotr Nowojski <pn...@apache.org>.
Hi David,

I can confirm that I'm able to reproduce this behaviour. I've tried
profiling/flame graphs and I was not able to make much sense out of those
results. There are no IO/Memory bottlenecks that I could notice, it looks
indeed like the Job is stuck inside RocksDB itself. This might be an issue
with for example memory configuration. Streaming jobs and State Processor
API are running in very different environments as the latter one is using
DataSet API under the hood, so maybe that can explain this? However I'm no
expert in neither DataSet API nor the RocksDB, so it's hard for me to make
progress here.

Maybe someone else can help here?

Piotrek


śr., 8 wrz 2021 o 14:45 David Causse <dc...@wikimedia.org> napisał(a):

> Hi,
>
> I'm investigating why a job we use to inspect a flink state is a lot
> slower than the bootstrap job used to generate it.
>
> I use RocksdbDB with a simple keyed value state mapping a string key to a
> long value. Generating the bootstrap state from a CSV file with 100M
> entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
> allowed). But another job that does the opposite (converts this state into
> a CSV file) takes several hours. I would have expected these two job
> runtimes to be in the same ballpark.
>
> I wrote a simple test case[1] to reproduce the problem. This program has 3
> jobs:
> - CreateState: generate a keyed state (string->long) using the state
> processor api
> - StreamJob: reads all the keys using a StreamingExecutionEnvironment
> - ReadState: reads all the keys using the state processor api
>
> Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
> StreamJob are done in less than a minute.
> ReadState is much slower (> 30minutes) on my system. The RocksDB state
> appears to be restored relatively quickly but after that the slots are
> performing at very different speeds. Some slots finish quickly but some
> others struggle to advance.
> Looking at the thread dumps I always see threads in
> org.rocksdb.RocksDB.get:
>
> "DataSource (at readKeyedState(ExistingSavepoint.java:314)
> (org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371
> RUNNABLE
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:2084)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)
>
> It seems suspiciously slow to me and I'm wondering if I'm missing
> something in the way the state processor api works.
>
> Thanks for your help!
>
> David.
>
> 1: https://github.com/nomoa/rocksdb-state-processor-test
>