You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rex Fenley <Re...@remind101.com> on 2020/09/04 05:36:56 UTC

State Storage Questions

Hello!

I've been digging into State Storage documentation, but it's left me
scratching my head with a few questions. Any help will be much appreciated.

Qs:
1. Is there a way to use RocksDB state backend for Flink on AWS EMR?
Possibly with S3 backed savepoints for recovery (or maybe hdfs for
savepoints?)? Only documentation related to AWS I can find makes it look
like AWS must use the S3 File System state backend and not RocksDB at all.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html

2. Does the FS state backend not compact? I thought everything in Flink was
stored as key/value. In which case, why would the last n values for a key
need to stick around, or how would they?
> An incremental checkpoint builds upon (typically multiple) previous
checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a
way that is self-consolidating over time. As a result, the incremental
checkpoint history in Flink does not grow indefinitely, and old checkpoints
are eventually subsumed and pruned automatically.

3. In the docs, Operators are referred to as non-keyed state, yet,
Operators have IDs that they are keyed by, so why are they referred to as
non-keyed state?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids

4. For the Table API / SQL are primary keys and join keys automatically
used as the keys for state under the hood?

Lastly
5. Is there a way to estimate roughly how much disk space state storage
will take per operation?

Thanks again!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: State Storage Questions

Posted by Rex Fenley <Re...@remind101.com>.
Thanks a bunch!

>For example, the Flink Kafka source operator's parallel instances maintain
as operator state a mapping of partitions to offsets for the partitions
that it is assigned to.

This I think clarifies things. This is literally state for the operator to
do its job, not really row data. The Table API/SQL will use "Keyed State"
for rows entirely separately.

Thanks!

On Mon, Sep 7, 2020 at 11:51 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi!
>
> Operator state is bound to a single parallel operator instance; there is
> no partitioning happening here.
> It is typically used in Flink source and sink operators. For example, the
> Flink Kafka source operator's parallel instances maintain as operator state
> a mapping of partitions to offsets for the partitions that it is assigned
> to. For state like these, it isn't partitionable by any key associated with
> an input DataStream.
>
> Since there is no partitioning scheme, redistribution of the state on
> operator rescale also happens differently compared to keyed state.
> Take for example a ListState; in contrast to a keyed ListState, an
> Operator ListState is a collection of state items that are independent from
> each other and eligible for redistribution across operator instances in the
> event of a rescale (by default, Flink uses simple round-robin for the
> redistribution).
> In other words, the list entries are the finest granularity at which the
> operator state can be redistributed, and should not be correlated with each
> other since each entry of the list may end up in different parallel
> operator instances on rescale.
>
> In general, there should rarely be a need to use operator state for
> typical user applications. It isn't massively scalable and usually is small
> in size.
>
> Cheers,
> Gordon
>
> On Sat, Sep 5, 2020 at 12:26 AM Rex Fenley <Re...@remind101.com> wrote:
>
>> This is so helpful, thank you!
>>
>> So just to clarify (3), Operator state has a partitioning scheme, but
>> it's simply not by key, it's something else that's special under-the-hood?
>> In which case, what data is stored in an Operator? I assumed it must be the
>> input data for e.g. a join, so that it can react efficiently to any data
>> changes in the stream and recombine only what has actually changed. Is this
>> correct?
>>
>> On Fri, Sep 4, 2020 at 1:20 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Hello!
>>>>
>>>> I've been digging into State Storage documentation, but it's left me
>>>> scratching my head with a few questions. Any help will be much appreciated.
>>>>
>>>> Qs:
>>>> 1. Is there a way to use RocksDB state backend for Flink on AWS EMR?
>>>> Possibly with S3 backed savepoints for recovery (or maybe hdfs for
>>>> savepoints?)? Only documentation related to AWS I can find makes it look
>>>> like AWS must use the S3 File System state backend and not RocksDB at all.
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html
>>>>
>>>
>>> I think there's some misunderstanding of the role of RocksDB vs
>>> filesystems for fault-tolerance here.
>>> RocksDB is a state backend option that manages user state out-of-core,
>>> and is managed by the Flink runtime. Users do not need to separately manage
>>> RocksDB instances.
>>> For persistence of that state as checkpoints / savepoints for
>>> fault-tolerance, you may choose the commonly used filesystems like S3 /
>>> HDFS.
>>>
>>> See [1] for how to configure your job to use RocksDBStateBackend as the
>>> runtime state backend and configuring a filesystem path for persistence.
>>>
>>>
>>>>
>>>> 2. Does the FS state backend not compact? I thought everything in Flink
>>>> was stored as key/value. In which case, why would the last n values for a
>>>> key need to stick around, or how would they?
>>>> > An incremental checkpoint builds upon (typically multiple) previous
>>>> checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a
>>>> way that is self-consolidating over time. As a result, the incremental
>>>> checkpoint history in Flink does not grow indefinitely, and old checkpoints
>>>> are eventually subsumed and pruned automatically.
>>>>
>>>>
>>> The sentence that you quote simply states how Flink leverages RocksDB's
>>> background compaction of SSTables to ensure that incremental checkpoints
>>> don't grow indefinitely in size.
>>> This has nothing to do with the FsStateBackend, as incremental
>>> checkpointing isn't supported there.
>>>
>>> Just as a clarification as there might be some other misunderstanding
>>> here:
>>> The difference between FsStateBackend v.s. RocksDBStateBackend is the
>>> state backend being used to maintain local state at runtime.
>>> RocksDBStateBackend obviously uses RocksDB, while the FsStateBackend
>>> uses in-memory hash maps. For persistence, both are checkpointed to a
>>> filesystem for fault-tolerance.
>>> The naming may be a bit confusing, so just wanted to clarify that here
>>> in case that may have caused any confusion with the questions above.
>>>
>>>
>>>> 3. In the docs, Operators are referred to as non-keyed state, yet,
>>>> Operators have IDs that they are keyed by, so why are they referred to as
>>>> non-keyed state?
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
>>>>
>>>>
>>> Operator state is referred to as non-keyed state because it is not
>>> co-partitioned with the stream by key and not values are not bound to
>>> single key (i.e. when you access keyed state, the access is bound to a
>>> single key), and have different schemes for repartitioning when operators
>>> are scaled up or down.
>>> The operator IDs you referred to are simply a unique ID to identify the
>>> same operators across different executions of the same job. I'm not sure
>>> what you mean by "operators have IDs that are keyed by"; those IDs are not
>>> used in any partitioning operation.
>>>
>>>
>>>
>>>> 4. For the Table API / SQL are primary keys and join keys automatically
>>>> used as the keys for state under the hood?
>>>>
>>>
>>> Yes.
>>>
>>>
>>>>
>>>> Lastly
>>>> 5. Is there a way to estimate roughly how much disk space state storage
>>>> will take per operation?
>>>>
>>>>
>>> Thanks again!
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>> Cheers,
>>> Gordon
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#configuring-a-state-backend
>>>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: State Storage Questions

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi!

Operator state is bound to a single parallel operator instance; there is no
partitioning happening here.
It is typically used in Flink source and sink operators. For example, the
Flink Kafka source operator's parallel instances maintain as operator state
a mapping of partitions to offsets for the partitions that it is assigned
to. For state like these, it isn't partitionable by any key associated with
an input DataStream.

Since there is no partitioning scheme, redistribution of the state on
operator rescale also happens differently compared to keyed state.
Take for example a ListState; in contrast to a keyed ListState, an Operator
ListState is a collection of state items that are independent from each
other and eligible for redistribution across operator instances in the
event of a rescale (by default, Flink uses simple round-robin for the
redistribution).
In other words, the list entries are the finest granularity at which the
operator state can be redistributed, and should not be correlated with each
other since each entry of the list may end up in different parallel
operator instances on rescale.

In general, there should rarely be a need to use operator state for typical
user applications. It isn't massively scalable and usually is small in size.

Cheers,
Gordon

On Sat, Sep 5, 2020 at 12:26 AM Rex Fenley <Re...@remind101.com> wrote:

> This is so helpful, thank you!
>
> So just to clarify (3), Operator state has a partitioning scheme, but it's
> simply not by key, it's something else that's special under-the-hood? In
> which case, what data is stored in an Operator? I assumed it must be the
> input data for e.g. a join, so that it can react efficiently to any data
> changes in the stream and recombine only what has actually changed. Is this
> correct?
>
> On Fri, Sep 4, 2020 at 1:20 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi,
>>
>> On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Hello!
>>>
>>> I've been digging into State Storage documentation, but it's left me
>>> scratching my head with a few questions. Any help will be much appreciated.
>>>
>>> Qs:
>>> 1. Is there a way to use RocksDB state backend for Flink on AWS EMR?
>>> Possibly with S3 backed savepoints for recovery (or maybe hdfs for
>>> savepoints?)? Only documentation related to AWS I can find makes it look
>>> like AWS must use the S3 File System state backend and not RocksDB at all.
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html
>>>
>>
>> I think there's some misunderstanding of the role of RocksDB vs
>> filesystems for fault-tolerance here.
>> RocksDB is a state backend option that manages user state out-of-core,
>> and is managed by the Flink runtime. Users do not need to separately manage
>> RocksDB instances.
>> For persistence of that state as checkpoints / savepoints for
>> fault-tolerance, you may choose the commonly used filesystems like S3 /
>> HDFS.
>>
>> See [1] for how to configure your job to use RocksDBStateBackend as the
>> runtime state backend and configuring a filesystem path for persistence.
>>
>>
>>>
>>> 2. Does the FS state backend not compact? I thought everything in Flink
>>> was stored as key/value. In which case, why would the last n values for a
>>> key need to stick around, or how would they?
>>> > An incremental checkpoint builds upon (typically multiple) previous
>>> checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a
>>> way that is self-consolidating over time. As a result, the incremental
>>> checkpoint history in Flink does not grow indefinitely, and old checkpoints
>>> are eventually subsumed and pruned automatically.
>>>
>>>
>> The sentence that you quote simply states how Flink leverages RocksDB's
>> background compaction of SSTables to ensure that incremental checkpoints
>> don't grow indefinitely in size.
>> This has nothing to do with the FsStateBackend, as incremental
>> checkpointing isn't supported there.
>>
>> Just as a clarification as there might be some other misunderstanding
>> here:
>> The difference between FsStateBackend v.s. RocksDBStateBackend is the
>> state backend being used to maintain local state at runtime.
>> RocksDBStateBackend obviously uses RocksDB, while the FsStateBackend uses
>> in-memory hash maps. For persistence, both are checkpointed to a filesystem
>> for fault-tolerance.
>> The naming may be a bit confusing, so just wanted to clarify that here in
>> case that may have caused any confusion with the questions above.
>>
>>
>>> 3. In the docs, Operators are referred to as non-keyed state, yet,
>>> Operators have IDs that they are keyed by, so why are they referred to as
>>> non-keyed state?
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
>>>
>>>
>> Operator state is referred to as non-keyed state because it is not
>> co-partitioned with the stream by key and not values are not bound to
>> single key (i.e. when you access keyed state, the access is bound to a
>> single key), and have different schemes for repartitioning when operators
>> are scaled up or down.
>> The operator IDs you referred to are simply a unique ID to identify the
>> same operators across different executions of the same job. I'm not sure
>> what you mean by "operators have IDs that are keyed by"; those IDs are not
>> used in any partitioning operation.
>>
>>
>>
>>> 4. For the Table API / SQL are primary keys and join keys automatically
>>> used as the keys for state under the hood?
>>>
>>
>> Yes.
>>
>>
>>>
>>> Lastly
>>> 5. Is there a way to estimate roughly how much disk space state storage
>>> will take per operation?
>>>
>>>
>> Thanks again!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>> Cheers,
>> Gordon
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#configuring-a-state-backend
>>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Re: State Storage Questions

Posted by Rex Fenley <Re...@remind101.com>.
This is so helpful, thank you!

So just to clarify (3), Operator state has a partitioning scheme, but it's
simply not by key, it's something else that's special under-the-hood? In
which case, what data is stored in an Operator? I assumed it must be the
input data for e.g. a join, so that it can react efficiently to any data
changes in the stream and recombine only what has actually changed. Is this
correct?

On Fri, Sep 4, 2020 at 1:20 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi,
>
> On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Hello!
>>
>> I've been digging into State Storage documentation, but it's left me
>> scratching my head with a few questions. Any help will be much appreciated.
>>
>> Qs:
>> 1. Is there a way to use RocksDB state backend for Flink on AWS EMR?
>> Possibly with S3 backed savepoints for recovery (or maybe hdfs for
>> savepoints?)? Only documentation related to AWS I can find makes it look
>> like AWS must use the S3 File System state backend and not RocksDB at all.
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html
>>
>
> I think there's some misunderstanding of the role of RocksDB vs
> filesystems for fault-tolerance here.
> RocksDB is a state backend option that manages user state out-of-core, and
> is managed by the Flink runtime. Users do not need to separately manage
> RocksDB instances.
> For persistence of that state as checkpoints / savepoints for
> fault-tolerance, you may choose the commonly used filesystems like S3 /
> HDFS.
>
> See [1] for how to configure your job to use RocksDBStateBackend as the
> runtime state backend and configuring a filesystem path for persistence.
>
>
>>
>> 2. Does the FS state backend not compact? I thought everything in Flink
>> was stored as key/value. In which case, why would the last n values for a
>> key need to stick around, or how would they?
>> > An incremental checkpoint builds upon (typically multiple) previous
>> checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a
>> way that is self-consolidating over time. As a result, the incremental
>> checkpoint history in Flink does not grow indefinitely, and old checkpoints
>> are eventually subsumed and pruned automatically.
>>
>>
> The sentence that you quote simply states how Flink leverages RocksDB's
> background compaction of SSTables to ensure that incremental checkpoints
> don't grow indefinitely in size.
> This has nothing to do with the FsStateBackend, as incremental
> checkpointing isn't supported there.
>
> Just as a clarification as there might be some other misunderstanding here:
> The difference between FsStateBackend v.s. RocksDBStateBackend is the
> state backend being used to maintain local state at runtime.
> RocksDBStateBackend obviously uses RocksDB, while the FsStateBackend uses
> in-memory hash maps. For persistence, both are checkpointed to a filesystem
> for fault-tolerance.
> The naming may be a bit confusing, so just wanted to clarify that here in
> case that may have caused any confusion with the questions above.
>
>
>> 3. In the docs, Operators are referred to as non-keyed state, yet,
>> Operators have IDs that they are keyed by, so why are they referred to as
>> non-keyed state?
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
>>
>>
> Operator state is referred to as non-keyed state because it is not
> co-partitioned with the stream by key and not values are not bound to
> single key (i.e. when you access keyed state, the access is bound to a
> single key), and have different schemes for repartitioning when operators
> are scaled up or down.
> The operator IDs you referred to are simply a unique ID to identify the
> same operators across different executions of the same job. I'm not sure
> what you mean by "operators have IDs that are keyed by"; those IDs are not
> used in any partitioning operation.
>
>
>
>> 4. For the Table API / SQL are primary keys and join keys automatically
>> used as the keys for state under the hood?
>>
>
> Yes.
>
>
>>
>> Lastly
>> 5. Is there a way to estimate roughly how much disk space state storage
>> will take per operation?
>>
>>
> Thanks again!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
> Cheers,
> Gordon
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#configuring-a-state-backend
>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: State Storage Questions

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley <Re...@remind101.com> wrote:

> Hello!
>
> I've been digging into State Storage documentation, but it's left me
> scratching my head with a few questions. Any help will be much appreciated.
>
> Qs:
> 1. Is there a way to use RocksDB state backend for Flink on AWS EMR?
> Possibly with S3 backed savepoints for recovery (or maybe hdfs for
> savepoints?)? Only documentation related to AWS I can find makes it look
> like AWS must use the S3 File System state backend and not RocksDB at all.
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html
>

I think there's some misunderstanding of the role of RocksDB vs filesystems
for fault-tolerance here.
RocksDB is a state backend option that manages user state out-of-core, and
is managed by the Flink runtime. Users do not need to separately manage
RocksDB instances.
For persistence of that state as checkpoints / savepoints for
fault-tolerance, you may choose the commonly used filesystems like S3 /
HDFS.

See [1] for how to configure your job to use RocksDBStateBackend as the
runtime state backend and configuring a filesystem path for persistence.


>
> 2. Does the FS state backend not compact? I thought everything in Flink
> was stored as key/value. In which case, why would the last n values for a
> key need to stick around, or how would they?
> > An incremental checkpoint builds upon (typically multiple) previous
> checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a
> way that is self-consolidating over time. As a result, the incremental
> checkpoint history in Flink does not grow indefinitely, and old checkpoints
> are eventually subsumed and pruned automatically.
>
>
The sentence that you quote simply states how Flink leverages RocksDB's
background compaction of SSTables to ensure that incremental checkpoints
don't grow indefinitely in size.
This has nothing to do with the FsStateBackend, as incremental
checkpointing isn't supported there.

Just as a clarification as there might be some other misunderstanding here:
The difference between FsStateBackend v.s. RocksDBStateBackend is the state
backend being used to maintain local state at runtime.
RocksDBStateBackend obviously uses RocksDB, while the FsStateBackend uses
in-memory hash maps. For persistence, both are checkpointed to a filesystem
for fault-tolerance.
The naming may be a bit confusing, so just wanted to clarify that here in
case that may have caused any confusion with the questions above.


> 3. In the docs, Operators are referred to as non-keyed state, yet,
> Operators have IDs that they are keyed by, so why are they referred to as
> non-keyed state?
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
>
>
Operator state is referred to as non-keyed state because it is not
co-partitioned with the stream by key and not values are not bound to
single key (i.e. when you access keyed state, the access is bound to a
single key), and have different schemes for repartitioning when operators
are scaled up or down.
The operator IDs you referred to are simply a unique ID to identify the
same operators across different executions of the same job. I'm not sure
what you mean by "operators have IDs that are keyed by"; those IDs are not
used in any partitioning operation.



> 4. For the Table API / SQL are primary keys and join keys automatically
> used as the keys for state under the hood?
>

Yes.


>
> Lastly
> 5. Is there a way to estimate roughly how much disk space state storage
> will take per operation?
>
>
Thanks again!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#configuring-a-state-backend