You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2021/12/28 14:22:47 UTC

[DISCUSS] FLIP-201: Persist local state in working directory

Hi everyone,

I would like to start a discussion about using the working directory to
persist local state for faster recovery (FLIP-201) [1]. Persisting the
local state will be beneficial if a crashed process is restarted with the
same working directory. In this case, Flink does not have to download the
state artifacts again and can recover locally.

A POC can be found here [2].

Looking forward to your feedback.

[1] https://cwiki.apache.org/confluence/x/wJuqCw
[2] https://github.com/tillrohrmann/flink/tree/FLIP-201

Cheers,
Till

Re: [DISCUSS] FLIP-201: Persist local state in working directory

Posted by Till Rohrmann <tr...@apache.org>.
Hi Yun,

I assume that most people will use this feature with k8s like
deployment environments. But in theory it works everywhere where you can
establish a stable relationship between volumes and Flink processes. If
Flink processes are restarted on different nodes, then of course you need
volumes that can be mounted to these nodes to make this feature work.

Cheers,
Till

On Mon, Jan 10, 2022 at 9:50 AM Yun Tang <my...@live.com> wrote:

> I think this feature could indeed help recovery faster on the case of node
> failure.
>
> It seems this feature could only work well with k8s-like deployment
> environment?
>
>
> Best,
> Yun Tang
> ________________________________
> From: David Morávek <dm...@apache.org>
> Sent: Wednesday, January 5, 2022 19:51
> To: dev <de...@flink.apache.org>
> Subject: Re: [DISCUSS] FLIP-201: Persist local state in working directory
>
> +1 the general direction here seems pretty solid
>
> D.
>
>
> On Wed, Jan 5, 2022 at 11:57 AM Till Rohrmann <tr...@apache.org>
> wrote:
>
> > If there is no other larger feedback, I would start the vote soonish.
> >
> > Cheers,
> > Till
> >
> > On Thu, Dec 30, 2021 at 4:28 PM Till Rohrmann <tr...@apache.org>
> > wrote:
> >
> > > Hi David,
> > >
> > > Thanks for your feedback.
> > >
> > > With the graceful shutdown I mean a way to stop the TaskManager and to
> > > clean up the working directory. At the moment, I think we always kill
> the
> > > process via SIGTERM or SIGKILL. This won't clean up the working
> directory
> > > because it could also originate from a process failure. I think what
> > Niklas
> > > does is to introduce a signal handler to react to SIGTERM to disconnect
> > > from the JobMaster.
> > >
> > > You are right that by default Flink will now set the RocksDB directory
> to
> > > the working temp directory. Before it defaulted to the spilling
> > > directories. I think this is not a problem because users can still
> > manually
> > > configure multiple RocksDB directories via
> > state.backend.rocksdb.localdir.
> > > Moreover, I am not sure how well this mechanism works in practice.
> Flink
> > > will simply iterate through the directories when creating new RocksDB
> > state
> > > backends w/o a lot of smartness. If one of the directories is full,
> then
> > > Flink won't use another one but simply fail.
> > >
> > > I do see the point of a proper serialization format and I agree that we
> > > should eventually implement it. My reasoning was that the PR is already
> > > quite big and I would prefer getting it in and then tackling this
> problem
> > > as a follow-up instead of increasing the scope of the changes further
> > > because the serialization format is not required for this feature
> > (strictly
> > > speaking). I hope that this makes sense.
> > >
> > > I will also respond to your PR comments.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, Dec 30, 2021 at 4:00 PM David Morávek <dm...@apache.org> wrote:
> > >
> > >> Hi Till,
> > >>
> > >> thanks for drafting the FLIP, it looks really good. I did a quick pass
> > >> over
> > >> the PR and it seems to be heading in a right direction.
> > >>
> > >> It might be required to introduce a graceful shutdown of the
> > TaskExecutor
> > >> > in order to support proper cleanup of resources.
> > >> >
> > >>
> > >> This is actively being worked on by Niklas in FLINK-25277 [1].
> > >>
> > >> In the PR, I've seen that you're also replacing directories for
> storing
> > >> the
> > >> local state with the working directory. Should this be a concern? Is
> for
> > >> example rocksdb able to leverage multiple mount paths for spreading
> the
> > >> load?
> > >>
> > >> I'd also be in favor of introducing a proper (evolving) serialization
> > >> format right away instead of the Java serialization, but no hard
> > feelings
> > >> if we don't.
> > >>
> > >> [1] https://issues.apache.org/jira/browse/FLINK-25277
> > >>
> > >> Best,
> > >> D.
> > >>
> > >> On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann <tr...@apache.org>
> > >> wrote:
> > >>
> > >> > I've created draft PR for the desired changes [1]. It might be
> easier
> > to
> > >> > take a look at than the branch.
> > >> >
> > >> > [1] https://github.com/apache/flink/pull/18237
> > >> >
> > >> > Cheers,
> > >> > Till
> > >> >
> > >> > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann <trohrmann@apache.org
> >
> > >> > wrote:
> > >> >
> > >> > > Hi everyone,
> > >> > >
> > >> > > I would like to start a discussion about using the working
> directory
> > >> to
> > >> > > persist local state for faster recovery (FLIP-201) [1]. Persisting
> > the
> > >> > > local state will be beneficial if a crashed process is restarted
> > with
> > >> the
> > >> > > same working directory. In this case, Flink does not have to
> > download
> > >> the
> > >> > > state artifacts again and can recover locally.
> > >> > >
> > >> > > A POC can be found here [2].
> > >> > >
> > >> > > Looking forward to your feedback.
> > >> > >
> > >> > > [1] https://cwiki.apache.org/confluence/x/wJuqCw
> > >> > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201
> > >> > >
> > >> > > Cheers,
> > >> > > Till
> > >> > >
> > >> >
> > >>
> > >
> >
>

Re: [DISCUSS] FLIP-201: Persist local state in working directory

Posted by Yun Tang <my...@live.com>.
I think this feature could indeed help recovery faster on the case of node failure.

It seems this feature could only work well with k8s-like deployment environment?


Best,
Yun Tang
________________________________
From: David Morávek <dm...@apache.org>
Sent: Wednesday, January 5, 2022 19:51
To: dev <de...@flink.apache.org>
Subject: Re: [DISCUSS] FLIP-201: Persist local state in working directory

+1 the general direction here seems pretty solid

D.


On Wed, Jan 5, 2022 at 11:57 AM Till Rohrmann <tr...@apache.org> wrote:

> If there is no other larger feedback, I would start the vote soonish.
>
> Cheers,
> Till
>
> On Thu, Dec 30, 2021 at 4:28 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
> > Hi David,
> >
> > Thanks for your feedback.
> >
> > With the graceful shutdown I mean a way to stop the TaskManager and to
> > clean up the working directory. At the moment, I think we always kill the
> > process via SIGTERM or SIGKILL. This won't clean up the working directory
> > because it could also originate from a process failure. I think what
> Niklas
> > does is to introduce a signal handler to react to SIGTERM to disconnect
> > from the JobMaster.
> >
> > You are right that by default Flink will now set the RocksDB directory to
> > the working temp directory. Before it defaulted to the spilling
> > directories. I think this is not a problem because users can still
> manually
> > configure multiple RocksDB directories via
> state.backend.rocksdb.localdir.
> > Moreover, I am not sure how well this mechanism works in practice. Flink
> > will simply iterate through the directories when creating new RocksDB
> state
> > backends w/o a lot of smartness. If one of the directories is full, then
> > Flink won't use another one but simply fail.
> >
> > I do see the point of a proper serialization format and I agree that we
> > should eventually implement it. My reasoning was that the PR is already
> > quite big and I would prefer getting it in and then tackling this problem
> > as a follow-up instead of increasing the scope of the changes further
> > because the serialization format is not required for this feature
> (strictly
> > speaking). I hope that this makes sense.
> >
> > I will also respond to your PR comments.
> >
> > Cheers,
> > Till
> >
> > On Thu, Dec 30, 2021 at 4:00 PM David Morávek <dm...@apache.org> wrote:
> >
> >> Hi Till,
> >>
> >> thanks for drafting the FLIP, it looks really good. I did a quick pass
> >> over
> >> the PR and it seems to be heading in a right direction.
> >>
> >> It might be required to introduce a graceful shutdown of the
> TaskExecutor
> >> > in order to support proper cleanup of resources.
> >> >
> >>
> >> This is actively being worked on by Niklas in FLINK-25277 [1].
> >>
> >> In the PR, I've seen that you're also replacing directories for storing
> >> the
> >> local state with the working directory. Should this be a concern? Is for
> >> example rocksdb able to leverage multiple mount paths for spreading the
> >> load?
> >>
> >> I'd also be in favor of introducing a proper (evolving) serialization
> >> format right away instead of the Java serialization, but no hard
> feelings
> >> if we don't.
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-25277
> >>
> >> Best,
> >> D.
> >>
> >> On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann <tr...@apache.org>
> >> wrote:
> >>
> >> > I've created draft PR for the desired changes [1]. It might be easier
> to
> >> > take a look at than the branch.
> >> >
> >> > [1] https://github.com/apache/flink/pull/18237
> >> >
> >> > Cheers,
> >> > Till
> >> >
> >> > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann <tr...@apache.org>
> >> > wrote:
> >> >
> >> > > Hi everyone,
> >> > >
> >> > > I would like to start a discussion about using the working directory
> >> to
> >> > > persist local state for faster recovery (FLIP-201) [1]. Persisting
> the
> >> > > local state will be beneficial if a crashed process is restarted
> with
> >> the
> >> > > same working directory. In this case, Flink does not have to
> download
> >> the
> >> > > state artifacts again and can recover locally.
> >> > >
> >> > > A POC can be found here [2].
> >> > >
> >> > > Looking forward to your feedback.
> >> > >
> >> > > [1] https://cwiki.apache.org/confluence/x/wJuqCw
> >> > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201
> >> > >
> >> > > Cheers,
> >> > > Till
> >> > >
> >> >
> >>
> >
>

Re: [DISCUSS] FLIP-201: Persist local state in working directory

Posted by David Morávek <dm...@apache.org>.
+1 the general direction here seems pretty solid

D.


On Wed, Jan 5, 2022 at 11:57 AM Till Rohrmann <tr...@apache.org> wrote:

> If there is no other larger feedback, I would start the vote soonish.
>
> Cheers,
> Till
>
> On Thu, Dec 30, 2021 at 4:28 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
> > Hi David,
> >
> > Thanks for your feedback.
> >
> > With the graceful shutdown I mean a way to stop the TaskManager and to
> > clean up the working directory. At the moment, I think we always kill the
> > process via SIGTERM or SIGKILL. This won't clean up the working directory
> > because it could also originate from a process failure. I think what
> Niklas
> > does is to introduce a signal handler to react to SIGTERM to disconnect
> > from the JobMaster.
> >
> > You are right that by default Flink will now set the RocksDB directory to
> > the working temp directory. Before it defaulted to the spilling
> > directories. I think this is not a problem because users can still
> manually
> > configure multiple RocksDB directories via
> state.backend.rocksdb.localdir.
> > Moreover, I am not sure how well this mechanism works in practice. Flink
> > will simply iterate through the directories when creating new RocksDB
> state
> > backends w/o a lot of smartness. If one of the directories is full, then
> > Flink won't use another one but simply fail.
> >
> > I do see the point of a proper serialization format and I agree that we
> > should eventually implement it. My reasoning was that the PR is already
> > quite big and I would prefer getting it in and then tackling this problem
> > as a follow-up instead of increasing the scope of the changes further
> > because the serialization format is not required for this feature
> (strictly
> > speaking). I hope that this makes sense.
> >
> > I will also respond to your PR comments.
> >
> > Cheers,
> > Till
> >
> > On Thu, Dec 30, 2021 at 4:00 PM David Morávek <dm...@apache.org> wrote:
> >
> >> Hi Till,
> >>
> >> thanks for drafting the FLIP, it looks really good. I did a quick pass
> >> over
> >> the PR and it seems to be heading in a right direction.
> >>
> >> It might be required to introduce a graceful shutdown of the
> TaskExecutor
> >> > in order to support proper cleanup of resources.
> >> >
> >>
> >> This is actively being worked on by Niklas in FLINK-25277 [1].
> >>
> >> In the PR, I've seen that you're also replacing directories for storing
> >> the
> >> local state with the working directory. Should this be a concern? Is for
> >> example rocksdb able to leverage multiple mount paths for spreading the
> >> load?
> >>
> >> I'd also be in favor of introducing a proper (evolving) serialization
> >> format right away instead of the Java serialization, but no hard
> feelings
> >> if we don't.
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-25277
> >>
> >> Best,
> >> D.
> >>
> >> On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann <tr...@apache.org>
> >> wrote:
> >>
> >> > I've created draft PR for the desired changes [1]. It might be easier
> to
> >> > take a look at than the branch.
> >> >
> >> > [1] https://github.com/apache/flink/pull/18237
> >> >
> >> > Cheers,
> >> > Till
> >> >
> >> > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann <tr...@apache.org>
> >> > wrote:
> >> >
> >> > > Hi everyone,
> >> > >
> >> > > I would like to start a discussion about using the working directory
> >> to
> >> > > persist local state for faster recovery (FLIP-201) [1]. Persisting
> the
> >> > > local state will be beneficial if a crashed process is restarted
> with
> >> the
> >> > > same working directory. In this case, Flink does not have to
> download
> >> the
> >> > > state artifacts again and can recover locally.
> >> > >
> >> > > A POC can be found here [2].
> >> > >
> >> > > Looking forward to your feedback.
> >> > >
> >> > > [1] https://cwiki.apache.org/confluence/x/wJuqCw
> >> > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201
> >> > >
> >> > > Cheers,
> >> > > Till
> >> > >
> >> >
> >>
> >
>

Re: [DISCUSS] FLIP-201: Persist local state in working directory

Posted by Till Rohrmann <tr...@apache.org>.
If there is no other larger feedback, I would start the vote soonish.

Cheers,
Till

On Thu, Dec 30, 2021 at 4:28 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi David,
>
> Thanks for your feedback.
>
> With the graceful shutdown I mean a way to stop the TaskManager and to
> clean up the working directory. At the moment, I think we always kill the
> process via SIGTERM or SIGKILL. This won't clean up the working directory
> because it could also originate from a process failure. I think what Niklas
> does is to introduce a signal handler to react to SIGTERM to disconnect
> from the JobMaster.
>
> You are right that by default Flink will now set the RocksDB directory to
> the working temp directory. Before it defaulted to the spilling
> directories. I think this is not a problem because users can still manually
> configure multiple RocksDB directories via state.backend.rocksdb.localdir.
> Moreover, I am not sure how well this mechanism works in practice. Flink
> will simply iterate through the directories when creating new RocksDB state
> backends w/o a lot of smartness. If one of the directories is full, then
> Flink won't use another one but simply fail.
>
> I do see the point of a proper serialization format and I agree that we
> should eventually implement it. My reasoning was that the PR is already
> quite big and I would prefer getting it in and then tackling this problem
> as a follow-up instead of increasing the scope of the changes further
> because the serialization format is not required for this feature (strictly
> speaking). I hope that this makes sense.
>
> I will also respond to your PR comments.
>
> Cheers,
> Till
>
> On Thu, Dec 30, 2021 at 4:00 PM David Morávek <dm...@apache.org> wrote:
>
>> Hi Till,
>>
>> thanks for drafting the FLIP, it looks really good. I did a quick pass
>> over
>> the PR and it seems to be heading in a right direction.
>>
>> It might be required to introduce a graceful shutdown of the TaskExecutor
>> > in order to support proper cleanup of resources.
>> >
>>
>> This is actively being worked on by Niklas in FLINK-25277 [1].
>>
>> In the PR, I've seen that you're also replacing directories for storing
>> the
>> local state with the working directory. Should this be a concern? Is for
>> example rocksdb able to leverage multiple mount paths for spreading the
>> load?
>>
>> I'd also be in favor of introducing a proper (evolving) serialization
>> format right away instead of the Java serialization, but no hard feelings
>> if we don't.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-25277
>>
>> Best,
>> D.
>>
>> On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>> > I've created draft PR for the desired changes [1]. It might be easier to
>> > take a look at than the branch.
>> >
>> > [1] https://github.com/apache/flink/pull/18237
>> >
>> > Cheers,
>> > Till
>> >
>> > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann <tr...@apache.org>
>> > wrote:
>> >
>> > > Hi everyone,
>> > >
>> > > I would like to start a discussion about using the working directory
>> to
>> > > persist local state for faster recovery (FLIP-201) [1]. Persisting the
>> > > local state will be beneficial if a crashed process is restarted with
>> the
>> > > same working directory. In this case, Flink does not have to download
>> the
>> > > state artifacts again and can recover locally.
>> > >
>> > > A POC can be found here [2].
>> > >
>> > > Looking forward to your feedback.
>> > >
>> > > [1] https://cwiki.apache.org/confluence/x/wJuqCw
>> > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201
>> > >
>> > > Cheers,
>> > > Till
>> > >
>> >
>>
>

Re: [DISCUSS] FLIP-201: Persist local state in working directory

Posted by Till Rohrmann <tr...@apache.org>.
Hi David,

Thanks for your feedback.

With the graceful shutdown I mean a way to stop the TaskManager and to
clean up the working directory. At the moment, I think we always kill the
process via SIGTERM or SIGKILL. This won't clean up the working directory
because it could also originate from a process failure. I think what Niklas
does is to introduce a signal handler to react to SIGTERM to disconnect
from the JobMaster.

You are right that by default Flink will now set the RocksDB directory to
the working temp directory. Before it defaulted to the spilling
directories. I think this is not a problem because users can still manually
configure multiple RocksDB directories via state.backend.rocksdb.localdir.
Moreover, I am not sure how well this mechanism works in practice. Flink
will simply iterate through the directories when creating new RocksDB state
backends w/o a lot of smartness. If one of the directories is full, then
Flink won't use another one but simply fail.

I do see the point of a proper serialization format and I agree that we
should eventually implement it. My reasoning was that the PR is already
quite big and I would prefer getting it in and then tackling this problem
as a follow-up instead of increasing the scope of the changes further
because the serialization format is not required for this feature (strictly
speaking). I hope that this makes sense.

I will also respond to your PR comments.

Cheers,
Till

On Thu, Dec 30, 2021 at 4:00 PM David Morávek <dm...@apache.org> wrote:

> Hi Till,
>
> thanks for drafting the FLIP, it looks really good. I did a quick pass over
> the PR and it seems to be heading in a right direction.
>
> It might be required to introduce a graceful shutdown of the TaskExecutor
> > in order to support proper cleanup of resources.
> >
>
> This is actively being worked on by Niklas in FLINK-25277 [1].
>
> In the PR, I've seen that you're also replacing directories for storing the
> local state with the working directory. Should this be a concern? Is for
> example rocksdb able to leverage multiple mount paths for spreading the
> load?
>
> I'd also be in favor of introducing a proper (evolving) serialization
> format right away instead of the Java serialization, but no hard feelings
> if we don't.
>
> [1] https://issues.apache.org/jira/browse/FLINK-25277
>
> Best,
> D.
>
> On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
> > I've created draft PR for the desired changes [1]. It might be easier to
> > take a look at than the branch.
> >
> > [1] https://github.com/apache/flink/pull/18237
> >
> > Cheers,
> > Till
> >
> > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann <tr...@apache.org>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I would like to start a discussion about using the working directory to
> > > persist local state for faster recovery (FLIP-201) [1]. Persisting the
> > > local state will be beneficial if a crashed process is restarted with
> the
> > > same working directory. In this case, Flink does not have to download
> the
> > > state artifacts again and can recover locally.
> > >
> > > A POC can be found here [2].
> > >
> > > Looking forward to your feedback.
> > >
> > > [1] https://cwiki.apache.org/confluence/x/wJuqCw
> > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201
> > >
> > > Cheers,
> > > Till
> > >
> >
>

Re: [DISCUSS] FLIP-201: Persist local state in working directory

Posted by David Morávek <dm...@apache.org>.
Hi Till,

thanks for drafting the FLIP, it looks really good. I did a quick pass over
the PR and it seems to be heading in a right direction.

It might be required to introduce a graceful shutdown of the TaskExecutor
> in order to support proper cleanup of resources.
>

This is actively being worked on by Niklas in FLINK-25277 [1].

In the PR, I've seen that you're also replacing directories for storing the
local state with the working directory. Should this be a concern? Is for
example rocksdb able to leverage multiple mount paths for spreading the
load?

I'd also be in favor of introducing a proper (evolving) serialization
format right away instead of the Java serialization, but no hard feelings
if we don't.

[1] https://issues.apache.org/jira/browse/FLINK-25277

Best,
D.

On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann <tr...@apache.org> wrote:

> I've created draft PR for the desired changes [1]. It might be easier to
> take a look at than the branch.
>
> [1] https://github.com/apache/flink/pull/18237
>
> Cheers,
> Till
>
> On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
> > Hi everyone,
> >
> > I would like to start a discussion about using the working directory to
> > persist local state for faster recovery (FLIP-201) [1]. Persisting the
> > local state will be beneficial if a crashed process is restarted with the
> > same working directory. In this case, Flink does not have to download the
> > state artifacts again and can recover locally.
> >
> > A POC can be found here [2].
> >
> > Looking forward to your feedback.
> >
> > [1] https://cwiki.apache.org/confluence/x/wJuqCw
> > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201
> >
> > Cheers,
> > Till
> >
>

Re: [DISCUSS] FLIP-201: Persist local state in working directory

Posted by Till Rohrmann <tr...@apache.org>.
I've created draft PR for the desired changes [1]. It might be easier to
take a look at than the branch.

[1] https://github.com/apache/flink/pull/18237

Cheers,
Till

On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi everyone,
>
> I would like to start a discussion about using the working directory to
> persist local state for faster recovery (FLIP-201) [1]. Persisting the
> local state will be beneficial if a crashed process is restarted with the
> same working directory. In this case, Flink does not have to download the
> state artifacts again and can recover locally.
>
> A POC can be found here [2].
>
> Looking forward to your feedback.
>
> [1] https://cwiki.apache.org/confluence/x/wJuqCw
> [2] https://github.com/tillrohrmann/flink/tree/FLIP-201
>
> Cheers,
> Till
>