You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2021/12/10 16:54:18 UTC

[DISCUSS] FLIP-198: Working directory for Flink processes

Hi everyone,

I would like to start a discussion about introducing an explicit working
directory for Flink processes that can be used to store information [1].
Per default this working directory will reside in the temporary directory
of the node Flink runs on. However, if configured to reside on a persistent
volume, then this information can be used to recover from process/node
failures. Moreover, such a working directory can be used to consolidate
some of our other directories Flink creates under /tmp (e.g. blobStorage,
RocksDB working directory).

Here is a draft PR that outlines the required changes [2].

Looking forward to your feedback.

[1] https://cwiki.apache.org/confluence/x/ZZiqCw
[2] https://github.com/apache/flink/pull/18083

Cheers,
Till

Re: [DISCUSS] FLIP-198: Working directory for Flink processes

Posted by Till Rohrmann <tr...@apache.org>.
I've updated the FLIP and excluded the blob storage from the initial scope.
I hope that I could answer all your questions. I will now start a vote. If
there are still things that we want to discuss, then please post to this
thread.

Cheers,
Till

On Thu, Dec 16, 2021 at 12:37 PM Till Rohrmann <tr...@apache.org> wrote:

> Maybe it is a good idea to remove storing the blobs in the working
> directory from this FLIP and to address the problems with doing this as a
> follow up. This will keep the FLIP narrowly scoped and faster to implement.
> I will update the FLIP and move the blob storage part to the follow-up
> section.
>
> Cheers,
> Till
>
> On Thu, Dec 16, 2021 at 10:59 AM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi David,
>>
>> I think such an utility can be helpful. I would suggest adding something
>> like this once it is needed by a component.
>>
>> Currently, I think only the BlobServer might be susceptible to this
>> problem because we don't fsync the written bytes and then don't use an
>> atomic rename operation. If we change this, then I think we should not be
>> affected by this problem. For the BlobStore we have some detection
>> mechanism in place that ensures that you download the correct blob using a
>> MessageDigest. For the BlobCache we probably should add a check that the
>> locally stored file has the same MessageDigest as expected and if not, then
>> delete the file and refetch it from the BlobServer/BlobStore.
>>
>> The RocksDB working directory will be cleaned up with every process
>> restart and the local state directory is not used across process restarts
>> at the moment.
>>
>> Cheers,
>> Till
>>
>> On Thu, Dec 16, 2021 at 9:13 AM David Morávek <dm...@apache.org> wrote:
>>
>>> Hi Till,
>>>
>>> thanks for drafting this FLIP, I think it's really a valuable
>>> improvement.
>>>
>>> Agreed with Yang, that YARN / k8s implementation should be out of scope
>>> of
>>> this FLIP. Just few notes on the possible integrations:
>>>
>>> For k8s, I think we can also benefit from this FLIP without StatefulSet.
>>> If
>>> the pod crashes for some reason, it will be restarted -> it's still on
>>> the
>>> same node, but it looses the state. This could be addressed by attaching
>>> an
>>> ephemeral volume to the container [1]. This is somewhere it between the
>>> current state & the persistent volume (this is where you need a
>>> StatefulSet) approach, that could be expensive (depends on the
>>> infrastructure).
>>>
>>> Example:
>>>
>>> apiVersion: v1
>>> kind: Pod
>>> metadata:
>>>   name: test-pod
>>> spec:
>>>   containers:
>>>   - image: ...
>>>     name: test-container
>>>     volumeMounts:
>>>     - mountPath: /cache
>>>       name: cache-volume
>>>   volumes:
>>>   - name: cache-volume
>>>     emptyDir: {}
>>>
>>> For YARN, I don't think it's as simple as remembering prior locations. As
>>> far as I remember the "restart from failure" results in a new container
>>> being created and the storage is tied with a container's lifecycle and
>>> the
>>> working directories are garbage collected right after the container
>>> FAILS /
>>> FINISHES. We'd most likely have to leverage a new component (something
>>> along the lines of how the shuffle services for YARN work), that runs
>>> embedded in NodeManager and allows you to externalize files for
>>> out-of-the-container-lifecycle use, and that ties their lifecycle with
>>> the
>>> job.
>>>
>>> As for the Chesnay's concern around corrupted files, are we sure that all
>>> components can recover from a corrupted file? Could we for example have a
>>> generic mechanism, that is reused by all the components writing to the
>>> working directory (CRC + File)?
>>>
>>> Other than that, I really like the FLIP and looking forward to have this
>>> feature in Flink +1.
>>>
>>> [1] https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/
>>>
>>> Best,
>>> D.
>>>
>>> On Thu, Dec 16, 2021 at 3:10 AM Yang Wang <da...@gmail.com> wrote:
>>>
>>> > I am afraid creating a dedicated StatefulSet for each TaskManager is
>>> too
>>> > expensive and using a shared StatefulSet for all
>>> > the TaskManagers is not flexible enough. Maybe setting a proper restart
>>> > policy for TaskManager pods could benefit from
>>> > this FLIP. But we might need to tackle some other issues, e.g.
>>> duplicated
>>> > registration, etc.
>>> >
>>> > All in all, this is out of the scope of this FLIP. I agree we could
>>> leave
>>> > it in the future FLIPs.
>>> >
>>> > I have no more concerns. +1
>>> >
>>> >
>>> > Best,
>>> > Yang
>>> >
>>> > Till Rohrmann <tr...@apache.org> 于2021年12月15日周三 19:06写道:
>>> >
>>> > > This is true. But this is not a new problem and I think that Flink
>>> should
>>> > > be susceptible to this problem already. One solution for this
>>> concrete
>>> > case
>>> > > could be that the BlobServer stores some checksums and validates the
>>> file
>>> > > before serving it to the TM.
>>> > >
>>> > > Cheers,
>>> > > Till
>>> > >
>>> > > On Wed, Dec 15, 2021 at 11:59 AM Chesnay Schepler <
>>> chesnay@apache.org>
>>> > > wrote:
>>> > >
>>> > > > The issue with corrupted files is that some of them aren't read by
>>> the
>>> > > > component that stores them.
>>> > > > For example, a file can be corrupted in the blob server of the JM,
>>> but
>>> > > > that it is corrupted will only be noticed by the TaskExecutor.
>>> > > >
>>> > > > On 15/12/2021 11:36, Till Rohrmann wrote:
>>> > > > > Thanks everyone for your feedback. Let me try to address it by
>>> > grouping
>>> > > > > some of the individual comments:
>>> > > > >
>>> > > > > ### Will this feature work for native Yarn and K8s deployments?
>>> > > > >
>>> > > > > The working directory is an optional feature that can be used to
>>> > > recover
>>> > > > > additional information. You can think of it like a cache. If the
>>> > > working
>>> > > > > directory is there, then Flink can do certain things a bit
>>> faster but
>>> > > in
>>> > > > > the worst case it will have to retrieve the required information
>>> from
>>> > > the
>>> > > > > JobManager or persistent storage.
>>> > > > >
>>> > > > > In order to make it work with native Yarn and K8s, we would have
>>> to
>>> > > > change
>>> > > > > these modes slightly. First of all, we would have to be able to
>>> map
>>> > > > working
>>> > > > > directories to processes and then set a deterministic resource
>>> ids
>>> > for
>>> > > > the
>>> > > > > processes. For K8s this could be easily achievable by using a
>>> > > StatefulSet
>>> > > > > as the deployment mechanism for TaskExecutors. For Yarn, we
>>> probably
>>> > > > would
>>> > > > > have to remember the prior locations of a process. Both things
>>> are
>>> > > > > potential follow ups that I don't want to tackle in this FLIP.
>>> > > > >
>>> > > > > If one of the modes configures the working directory to be on a
>>> full
>>> > or
>>> > > > > broken disk, then the process will fail. I think this is not all
>>> that
>>> > > > > different from the current state where some things in Flink will
>>> fail
>>> > > if
>>> > > > > they picked the wrong/full temporary directory (e.g. blob storage
>>> > > > > directory).
>>> > > > >
>>> > > > > ### Cleanup
>>> > > > >
>>> > > > > The working directory will be cleaned up if the Flink process is
>>> > > > gracefully
>>> > > > > shut down. This means that the JobManager process will clean it
>>> up if
>>> > > it
>>> > > > > runs in application mode and the job is terminated. SIGTERM and
>>> > SIGKILL
>>> > > > > signals will be treated as an ungraceful shutdown and therefore
>>> they
>>> > > > won't
>>> > > > > clean up the working directory. This means that we probably also
>>> > need a
>>> > > > > graceful way for shutting TaskManager processes down in the
>>> future
>>> > > > because
>>> > > > > right now they are in most cases killed in order to shut them
>>> down.
>>> > If
>>> > > > the
>>> > > > > user uses the tmp directory, then any left-over working
>>> directories
>>> > > will
>>> > > > be
>>> > > > > cleaned up with the next system restart. This is somewhat
>>> similar to
>>> > > how
>>> > > > > RocksDB's working directory is currently cleaned up as well.
>>> > > > >
>>> > > > > ### Corrupted files
>>> > > > >
>>> > > > > The working directory itself won't give you any guarantees. It
>>> will
>>> > be
>>> > > > the
>>> > > > > responsibility of the component that uses the working directory
>>> to
>>> > make
>>> > > > > sure that it can deal with corrupted files. E.g. if the component
>>> > > cannot
>>> > > > > read the file, then it should delete it and fall back to the
>>> remote
>>> > > > > storage/ground truth to retrieve the required information.
>>> > > > >
>>> > > > > I hope this could answer your questions. Let me know if you have
>>> more
>>> > > > > feedback.
>>> > > > >
>>> > > > > Cheers,
>>> > > > > Till
>>> > > > >
>>> > > > > On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <li...@gmail.com>
>>> > wrote:
>>> > > > >
>>> > > > >> I like the idea. It can reuse the disk to do many things. Isn't
>>> it
>>> > > only
>>> > > > >> for inner failover? If not, the cleaning may be a problem. Also,
>>> > many
>>> > > > >> resource components have their own disk schedule strategy.
>>> > > > >>
>>> > > > >> Chesnay Schepler <ch...@apache.org> 于2021年12月12日周日 19:59写道:
>>> > > > >>
>>> > > > >>> How do you intend to handle corrupted files, in particular due
>>> to
>>> > > > >>> process crashes during a write?
>>> > > > >>> Will all writes to a cached directory append some suffix (e.g.,
>>> > > > >>> ".pending") and do a rename?
>>> > > > >>>
>>> > > > >>> On 10/12/2021 17:54, Till Rohrmann wrote:
>>> > > > >>>> Hi everyone,
>>> > > > >>>>
>>> > > > >>>> I would like to start a discussion about introducing an
>>> explicit
>>> > > > working
>>> > > > >>>> directory for Flink processes that can be used to store
>>> > information
>>> > > > [1].
>>> > > > >>>> Per default this working directory will reside in the
>>> temporary
>>> > > > >>> directory
>>> > > > >>>> of the node Flink runs on. However, if configured to reside
>>> on a
>>> > > > >>> persistent
>>> > > > >>>> volume, then this information can be used to recover from
>>> > > process/node
>>> > > > >>>> failures. Moreover, such a working directory can be used to
>>> > > > consolidate
>>> > > > >>>> some of our other directories Flink creates under /tmp (e.g.
>>> > > > >>> blobStorage,
>>> > > > >>>> RocksDB working directory).
>>> > > > >>>>
>>> > > > >>>> Here is a draft PR that outlines the required changes [2].
>>> > > > >>>>
>>> > > > >>>> Looking forward to your feedback.
>>> > > > >>>>
>>> > > > >>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw
>>> > > > >>>> [2] https://github.com/apache/flink/pull/18083
>>> > > > >>>>
>>> > > > >>>> Cheers,
>>> > > > >>>> Till
>>> > > > >>>>
>>> > > > >>>
>>> > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Re: [DISCUSS] FLIP-198: Working directory for Flink processes

Posted by Till Rohrmann <tr...@apache.org>.
Maybe it is a good idea to remove storing the blobs in the working
directory from this FLIP and to address the problems with doing this as a
follow up. This will keep the FLIP narrowly scoped and faster to implement.
I will update the FLIP and move the blob storage part to the follow-up
section.

Cheers,
Till

On Thu, Dec 16, 2021 at 10:59 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi David,
>
> I think such an utility can be helpful. I would suggest adding something
> like this once it is needed by a component.
>
> Currently, I think only the BlobServer might be susceptible to this
> problem because we don't fsync the written bytes and then don't use an
> atomic rename operation. If we change this, then I think we should not be
> affected by this problem. For the BlobStore we have some detection
> mechanism in place that ensures that you download the correct blob using a
> MessageDigest. For the BlobCache we probably should add a check that the
> locally stored file has the same MessageDigest as expected and if not, then
> delete the file and refetch it from the BlobServer/BlobStore.
>
> The RocksDB working directory will be cleaned up with every process
> restart and the local state directory is not used across process restarts
> at the moment.
>
> Cheers,
> Till
>
> On Thu, Dec 16, 2021 at 9:13 AM David Morávek <dm...@apache.org> wrote:
>
>> Hi Till,
>>
>> thanks for drafting this FLIP, I think it's really a valuable improvement.
>>
>> Agreed with Yang, that YARN / k8s implementation should be out of scope of
>> this FLIP. Just few notes on the possible integrations:
>>
>> For k8s, I think we can also benefit from this FLIP without StatefulSet.
>> If
>> the pod crashes for some reason, it will be restarted -> it's still on the
>> same node, but it looses the state. This could be addressed by attaching
>> an
>> ephemeral volume to the container [1]. This is somewhere it between the
>> current state & the persistent volume (this is where you need a
>> StatefulSet) approach, that could be expensive (depends on the
>> infrastructure).
>>
>> Example:
>>
>> apiVersion: v1
>> kind: Pod
>> metadata:
>>   name: test-pod
>> spec:
>>   containers:
>>   - image: ...
>>     name: test-container
>>     volumeMounts:
>>     - mountPath: /cache
>>       name: cache-volume
>>   volumes:
>>   - name: cache-volume
>>     emptyDir: {}
>>
>> For YARN, I don't think it's as simple as remembering prior locations. As
>> far as I remember the "restart from failure" results in a new container
>> being created and the storage is tied with a container's lifecycle and the
>> working directories are garbage collected right after the container FAILS
>> /
>> FINISHES. We'd most likely have to leverage a new component (something
>> along the lines of how the shuffle services for YARN work), that runs
>> embedded in NodeManager and allows you to externalize files for
>> out-of-the-container-lifecycle use, and that ties their lifecycle with the
>> job.
>>
>> As for the Chesnay's concern around corrupted files, are we sure that all
>> components can recover from a corrupted file? Could we for example have a
>> generic mechanism, that is reused by all the components writing to the
>> working directory (CRC + File)?
>>
>> Other than that, I really like the FLIP and looking forward to have this
>> feature in Flink +1.
>>
>> [1] https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/
>>
>> Best,
>> D.
>>
>> On Thu, Dec 16, 2021 at 3:10 AM Yang Wang <da...@gmail.com> wrote:
>>
>> > I am afraid creating a dedicated StatefulSet for each TaskManager is too
>> > expensive and using a shared StatefulSet for all
>> > the TaskManagers is not flexible enough. Maybe setting a proper restart
>> > policy for TaskManager pods could benefit from
>> > this FLIP. But we might need to tackle some other issues, e.g.
>> duplicated
>> > registration, etc.
>> >
>> > All in all, this is out of the scope of this FLIP. I agree we could
>> leave
>> > it in the future FLIPs.
>> >
>> > I have no more concerns. +1
>> >
>> >
>> > Best,
>> > Yang
>> >
>> > Till Rohrmann <tr...@apache.org> 于2021年12月15日周三 19:06写道:
>> >
>> > > This is true. But this is not a new problem and I think that Flink
>> should
>> > > be susceptible to this problem already. One solution for this concrete
>> > case
>> > > could be that the BlobServer stores some checksums and validates the
>> file
>> > > before serving it to the TM.
>> > >
>> > > Cheers,
>> > > Till
>> > >
>> > > On Wed, Dec 15, 2021 at 11:59 AM Chesnay Schepler <chesnay@apache.org
>> >
>> > > wrote:
>> > >
>> > > > The issue with corrupted files is that some of them aren't read by
>> the
>> > > > component that stores them.
>> > > > For example, a file can be corrupted in the blob server of the JM,
>> but
>> > > > that it is corrupted will only be noticed by the TaskExecutor.
>> > > >
>> > > > On 15/12/2021 11:36, Till Rohrmann wrote:
>> > > > > Thanks everyone for your feedback. Let me try to address it by
>> > grouping
>> > > > > some of the individual comments:
>> > > > >
>> > > > > ### Will this feature work for native Yarn and K8s deployments?
>> > > > >
>> > > > > The working directory is an optional feature that can be used to
>> > > recover
>> > > > > additional information. You can think of it like a cache. If the
>> > > working
>> > > > > directory is there, then Flink can do certain things a bit faster
>> but
>> > > in
>> > > > > the worst case it will have to retrieve the required information
>> from
>> > > the
>> > > > > JobManager or persistent storage.
>> > > > >
>> > > > > In order to make it work with native Yarn and K8s, we would have
>> to
>> > > > change
>> > > > > these modes slightly. First of all, we would have to be able to
>> map
>> > > > working
>> > > > > directories to processes and then set a deterministic resource ids
>> > for
>> > > > the
>> > > > > processes. For K8s this could be easily achievable by using a
>> > > StatefulSet
>> > > > > as the deployment mechanism for TaskExecutors. For Yarn, we
>> probably
>> > > > would
>> > > > > have to remember the prior locations of a process. Both things are
>> > > > > potential follow ups that I don't want to tackle in this FLIP.
>> > > > >
>> > > > > If one of the modes configures the working directory to be on a
>> full
>> > or
>> > > > > broken disk, then the process will fail. I think this is not all
>> that
>> > > > > different from the current state where some things in Flink will
>> fail
>> > > if
>> > > > > they picked the wrong/full temporary directory (e.g. blob storage
>> > > > > directory).
>> > > > >
>> > > > > ### Cleanup
>> > > > >
>> > > > > The working directory will be cleaned up if the Flink process is
>> > > > gracefully
>> > > > > shut down. This means that the JobManager process will clean it
>> up if
>> > > it
>> > > > > runs in application mode and the job is terminated. SIGTERM and
>> > SIGKILL
>> > > > > signals will be treated as an ungraceful shutdown and therefore
>> they
>> > > > won't
>> > > > > clean up the working directory. This means that we probably also
>> > need a
>> > > > > graceful way for shutting TaskManager processes down in the future
>> > > > because
>> > > > > right now they are in most cases killed in order to shut them
>> down.
>> > If
>> > > > the
>> > > > > user uses the tmp directory, then any left-over working
>> directories
>> > > will
>> > > > be
>> > > > > cleaned up with the next system restart. This is somewhat similar
>> to
>> > > how
>> > > > > RocksDB's working directory is currently cleaned up as well.
>> > > > >
>> > > > > ### Corrupted files
>> > > > >
>> > > > > The working directory itself won't give you any guarantees. It
>> will
>> > be
>> > > > the
>> > > > > responsibility of the component that uses the working directory to
>> > make
>> > > > > sure that it can deal with corrupted files. E.g. if the component
>> > > cannot
>> > > > > read the file, then it should delete it and fall back to the
>> remote
>> > > > > storage/ground truth to retrieve the required information.
>> > > > >
>> > > > > I hope this could answer your questions. Let me know if you have
>> more
>> > > > > feedback.
>> > > > >
>> > > > > Cheers,
>> > > > > Till
>> > > > >
>> > > > > On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <li...@gmail.com>
>> > wrote:
>> > > > >
>> > > > >> I like the idea. It can reuse the disk to do many things. Isn't
>> it
>> > > only
>> > > > >> for inner failover? If not, the cleaning may be a problem. Also,
>> > many
>> > > > >> resource components have their own disk schedule strategy.
>> > > > >>
>> > > > >> Chesnay Schepler <ch...@apache.org> 于2021年12月12日周日 19:59写道:
>> > > > >>
>> > > > >>> How do you intend to handle corrupted files, in particular due
>> to
>> > > > >>> process crashes during a write?
>> > > > >>> Will all writes to a cached directory append some suffix (e.g.,
>> > > > >>> ".pending") and do a rename?
>> > > > >>>
>> > > > >>> On 10/12/2021 17:54, Till Rohrmann wrote:
>> > > > >>>> Hi everyone,
>> > > > >>>>
>> > > > >>>> I would like to start a discussion about introducing an
>> explicit
>> > > > working
>> > > > >>>> directory for Flink processes that can be used to store
>> > information
>> > > > [1].
>> > > > >>>> Per default this working directory will reside in the temporary
>> > > > >>> directory
>> > > > >>>> of the node Flink runs on. However, if configured to reside on
>> a
>> > > > >>> persistent
>> > > > >>>> volume, then this information can be used to recover from
>> > > process/node
>> > > > >>>> failures. Moreover, such a working directory can be used to
>> > > > consolidate
>> > > > >>>> some of our other directories Flink creates under /tmp (e.g.
>> > > > >>> blobStorage,
>> > > > >>>> RocksDB working directory).
>> > > > >>>>
>> > > > >>>> Here is a draft PR that outlines the required changes [2].
>> > > > >>>>
>> > > > >>>> Looking forward to your feedback.
>> > > > >>>>
>> > > > >>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw
>> > > > >>>> [2] https://github.com/apache/flink/pull/18083
>> > > > >>>>
>> > > > >>>> Cheers,
>> > > > >>>> Till
>> > > > >>>>
>> > > > >>>
>> > > >
>> > > >
>> > >
>> >
>>
>

Re: [DISCUSS] FLIP-198: Working directory for Flink processes

Posted by Till Rohrmann <tr...@apache.org>.
Hi David,

I think such an utility can be helpful. I would suggest adding something
like this once it is needed by a component.

Currently, I think only the BlobServer might be susceptible to this problem
because we don't fsync the written bytes and then don't use an atomic
rename operation. If we change this, then I think we should not be affected
by this problem. For the BlobStore we have some detection mechanism in
place that ensures that you download the correct blob using a
MessageDigest. For the BlobCache we probably should add a check that the
locally stored file has the same MessageDigest as expected and if not, then
delete the file and refetch it from the BlobServer/BlobStore.

The RocksDB working directory will be cleaned up with every process restart
and the local state directory is not used across process restarts at the
moment.

Cheers,
Till

On Thu, Dec 16, 2021 at 9:13 AM David Morávek <dm...@apache.org> wrote:

> Hi Till,
>
> thanks for drafting this FLIP, I think it's really a valuable improvement.
>
> Agreed with Yang, that YARN / k8s implementation should be out of scope of
> this FLIP. Just few notes on the possible integrations:
>
> For k8s, I think we can also benefit from this FLIP without StatefulSet. If
> the pod crashes for some reason, it will be restarted -> it's still on the
> same node, but it looses the state. This could be addressed by attaching an
> ephemeral volume to the container [1]. This is somewhere it between the
> current state & the persistent volume (this is where you need a
> StatefulSet) approach, that could be expensive (depends on the
> infrastructure).
>
> Example:
>
> apiVersion: v1
> kind: Pod
> metadata:
>   name: test-pod
> spec:
>   containers:
>   - image: ...
>     name: test-container
>     volumeMounts:
>     - mountPath: /cache
>       name: cache-volume
>   volumes:
>   - name: cache-volume
>     emptyDir: {}
>
> For YARN, I don't think it's as simple as remembering prior locations. As
> far as I remember the "restart from failure" results in a new container
> being created and the storage is tied with a container's lifecycle and the
> working directories are garbage collected right after the container FAILS /
> FINISHES. We'd most likely have to leverage a new component (something
> along the lines of how the shuffle services for YARN work), that runs
> embedded in NodeManager and allows you to externalize files for
> out-of-the-container-lifecycle use, and that ties their lifecycle with the
> job.
>
> As for the Chesnay's concern around corrupted files, are we sure that all
> components can recover from a corrupted file? Could we for example have a
> generic mechanism, that is reused by all the components writing to the
> working directory (CRC + File)?
>
> Other than that, I really like the FLIP and looking forward to have this
> feature in Flink +1.
>
> [1] https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/
>
> Best,
> D.
>
> On Thu, Dec 16, 2021 at 3:10 AM Yang Wang <da...@gmail.com> wrote:
>
> > I am afraid creating a dedicated StatefulSet for each TaskManager is too
> > expensive and using a shared StatefulSet for all
> > the TaskManagers is not flexible enough. Maybe setting a proper restart
> > policy for TaskManager pods could benefit from
> > this FLIP. But we might need to tackle some other issues, e.g. duplicated
> > registration, etc.
> >
> > All in all, this is out of the scope of this FLIP. I agree we could leave
> > it in the future FLIPs.
> >
> > I have no more concerns. +1
> >
> >
> > Best,
> > Yang
> >
> > Till Rohrmann <tr...@apache.org> 于2021年12月15日周三 19:06写道:
> >
> > > This is true. But this is not a new problem and I think that Flink
> should
> > > be susceptible to this problem already. One solution for this concrete
> > case
> > > could be that the BlobServer stores some checksums and validates the
> file
> > > before serving it to the TM.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Dec 15, 2021 at 11:59 AM Chesnay Schepler <ch...@apache.org>
> > > wrote:
> > >
> > > > The issue with corrupted files is that some of them aren't read by
> the
> > > > component that stores them.
> > > > For example, a file can be corrupted in the blob server of the JM,
> but
> > > > that it is corrupted will only be noticed by the TaskExecutor.
> > > >
> > > > On 15/12/2021 11:36, Till Rohrmann wrote:
> > > > > Thanks everyone for your feedback. Let me try to address it by
> > grouping
> > > > > some of the individual comments:
> > > > >
> > > > > ### Will this feature work for native Yarn and K8s deployments?
> > > > >
> > > > > The working directory is an optional feature that can be used to
> > > recover
> > > > > additional information. You can think of it like a cache. If the
> > > working
> > > > > directory is there, then Flink can do certain things a bit faster
> but
> > > in
> > > > > the worst case it will have to retrieve the required information
> from
> > > the
> > > > > JobManager or persistent storage.
> > > > >
> > > > > In order to make it work with native Yarn and K8s, we would have to
> > > > change
> > > > > these modes slightly. First of all, we would have to be able to map
> > > > working
> > > > > directories to processes and then set a deterministic resource ids
> > for
> > > > the
> > > > > processes. For K8s this could be easily achievable by using a
> > > StatefulSet
> > > > > as the deployment mechanism for TaskExecutors. For Yarn, we
> probably
> > > > would
> > > > > have to remember the prior locations of a process. Both things are
> > > > > potential follow ups that I don't want to tackle in this FLIP.
> > > > >
> > > > > If one of the modes configures the working directory to be on a
> full
> > or
> > > > > broken disk, then the process will fail. I think this is not all
> that
> > > > > different from the current state where some things in Flink will
> fail
> > > if
> > > > > they picked the wrong/full temporary directory (e.g. blob storage
> > > > > directory).
> > > > >
> > > > > ### Cleanup
> > > > >
> > > > > The working directory will be cleaned up if the Flink process is
> > > > gracefully
> > > > > shut down. This means that the JobManager process will clean it up
> if
> > > it
> > > > > runs in application mode and the job is terminated. SIGTERM and
> > SIGKILL
> > > > > signals will be treated as an ungraceful shutdown and therefore
> they
> > > > won't
> > > > > clean up the working directory. This means that we probably also
> > need a
> > > > > graceful way for shutting TaskManager processes down in the future
> > > > because
> > > > > right now they are in most cases killed in order to shut them down.
> > If
> > > > the
> > > > > user uses the tmp directory, then any left-over working directories
> > > will
> > > > be
> > > > > cleaned up with the next system restart. This is somewhat similar
> to
> > > how
> > > > > RocksDB's working directory is currently cleaned up as well.
> > > > >
> > > > > ### Corrupted files
> > > > >
> > > > > The working directory itself won't give you any guarantees. It will
> > be
> > > > the
> > > > > responsibility of the component that uses the working directory to
> > make
> > > > > sure that it can deal with corrupted files. E.g. if the component
> > > cannot
> > > > > read the file, then it should delete it and fall back to the remote
> > > > > storage/ground truth to retrieve the required information.
> > > > >
> > > > > I hope this could answer your questions. Let me know if you have
> more
> > > > > feedback.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <li...@gmail.com>
> > wrote:
> > > > >
> > > > >> I like the idea. It can reuse the disk to do many things. Isn't it
> > > only
> > > > >> for inner failover? If not, the cleaning may be a problem. Also,
> > many
> > > > >> resource components have their own disk schedule strategy.
> > > > >>
> > > > >> Chesnay Schepler <ch...@apache.org> 于2021年12月12日周日 19:59写道:
> > > > >>
> > > > >>> How do you intend to handle corrupted files, in particular due to
> > > > >>> process crashes during a write?
> > > > >>> Will all writes to a cached directory append some suffix (e.g.,
> > > > >>> ".pending") and do a rename?
> > > > >>>
> > > > >>> On 10/12/2021 17:54, Till Rohrmann wrote:
> > > > >>>> Hi everyone,
> > > > >>>>
> > > > >>>> I would like to start a discussion about introducing an explicit
> > > > working
> > > > >>>> directory for Flink processes that can be used to store
> > information
> > > > [1].
> > > > >>>> Per default this working directory will reside in the temporary
> > > > >>> directory
> > > > >>>> of the node Flink runs on. However, if configured to reside on a
> > > > >>> persistent
> > > > >>>> volume, then this information can be used to recover from
> > > process/node
> > > > >>>> failures. Moreover, such a working directory can be used to
> > > > consolidate
> > > > >>>> some of our other directories Flink creates under /tmp (e.g.
> > > > >>> blobStorage,
> > > > >>>> RocksDB working directory).
> > > > >>>>
> > > > >>>> Here is a draft PR that outlines the required changes [2].
> > > > >>>>
> > > > >>>> Looking forward to your feedback.
> > > > >>>>
> > > > >>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw
> > > > >>>> [2] https://github.com/apache/flink/pull/18083
> > > > >>>>
> > > > >>>> Cheers,
> > > > >>>> Till
> > > > >>>>
> > > > >>>
> > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-198: Working directory for Flink processes

Posted by David Morávek <dm...@apache.org>.
Hi Till,

thanks for drafting this FLIP, I think it's really a valuable improvement.

Agreed with Yang, that YARN / k8s implementation should be out of scope of
this FLIP. Just few notes on the possible integrations:

For k8s, I think we can also benefit from this FLIP without StatefulSet. If
the pod crashes for some reason, it will be restarted -> it's still on the
same node, but it looses the state. This could be addressed by attaching an
ephemeral volume to the container [1]. This is somewhere it between the
current state & the persistent volume (this is where you need a
StatefulSet) approach, that could be expensive (depends on the
infrastructure).

Example:

apiVersion: v1
kind: Pod
metadata:
  name: test-pod
spec:
  containers:
  - image: ...
    name: test-container
    volumeMounts:
    - mountPath: /cache
      name: cache-volume
  volumes:
  - name: cache-volume
    emptyDir: {}

For YARN, I don't think it's as simple as remembering prior locations. As
far as I remember the "restart from failure" results in a new container
being created and the storage is tied with a container's lifecycle and the
working directories are garbage collected right after the container FAILS /
FINISHES. We'd most likely have to leverage a new component (something
along the lines of how the shuffle services for YARN work), that runs
embedded in NodeManager and allows you to externalize files for
out-of-the-container-lifecycle use, and that ties their lifecycle with the
job.

As for the Chesnay's concern around corrupted files, are we sure that all
components can recover from a corrupted file? Could we for example have a
generic mechanism, that is reused by all the components writing to the
working directory (CRC + File)?

Other than that, I really like the FLIP and looking forward to have this
feature in Flink +1.

[1] https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/

Best,
D.

On Thu, Dec 16, 2021 at 3:10 AM Yang Wang <da...@gmail.com> wrote:

> I am afraid creating a dedicated StatefulSet for each TaskManager is too
> expensive and using a shared StatefulSet for all
> the TaskManagers is not flexible enough. Maybe setting a proper restart
> policy for TaskManager pods could benefit from
> this FLIP. But we might need to tackle some other issues, e.g. duplicated
> registration, etc.
>
> All in all, this is out of the scope of this FLIP. I agree we could leave
> it in the future FLIPs.
>
> I have no more concerns. +1
>
>
> Best,
> Yang
>
> Till Rohrmann <tr...@apache.org> 于2021年12月15日周三 19:06写道:
>
> > This is true. But this is not a new problem and I think that Flink should
> > be susceptible to this problem already. One solution for this concrete
> case
> > could be that the BlobServer stores some checksums and validates the file
> > before serving it to the TM.
> >
> > Cheers,
> > Till
> >
> > On Wed, Dec 15, 2021 at 11:59 AM Chesnay Schepler <ch...@apache.org>
> > wrote:
> >
> > > The issue with corrupted files is that some of them aren't read by the
> > > component that stores them.
> > > For example, a file can be corrupted in the blob server of the JM, but
> > > that it is corrupted will only be noticed by the TaskExecutor.
> > >
> > > On 15/12/2021 11:36, Till Rohrmann wrote:
> > > > Thanks everyone for your feedback. Let me try to address it by
> grouping
> > > > some of the individual comments:
> > > >
> > > > ### Will this feature work for native Yarn and K8s deployments?
> > > >
> > > > The working directory is an optional feature that can be used to
> > recover
> > > > additional information. You can think of it like a cache. If the
> > working
> > > > directory is there, then Flink can do certain things a bit faster but
> > in
> > > > the worst case it will have to retrieve the required information from
> > the
> > > > JobManager or persistent storage.
> > > >
> > > > In order to make it work with native Yarn and K8s, we would have to
> > > change
> > > > these modes slightly. First of all, we would have to be able to map
> > > working
> > > > directories to processes and then set a deterministic resource ids
> for
> > > the
> > > > processes. For K8s this could be easily achievable by using a
> > StatefulSet
> > > > as the deployment mechanism for TaskExecutors. For Yarn, we probably
> > > would
> > > > have to remember the prior locations of a process. Both things are
> > > > potential follow ups that I don't want to tackle in this FLIP.
> > > >
> > > > If one of the modes configures the working directory to be on a full
> or
> > > > broken disk, then the process will fail. I think this is not all that
> > > > different from the current state where some things in Flink will fail
> > if
> > > > they picked the wrong/full temporary directory (e.g. blob storage
> > > > directory).
> > > >
> > > > ### Cleanup
> > > >
> > > > The working directory will be cleaned up if the Flink process is
> > > gracefully
> > > > shut down. This means that the JobManager process will clean it up if
> > it
> > > > runs in application mode and the job is terminated. SIGTERM and
> SIGKILL
> > > > signals will be treated as an ungraceful shutdown and therefore they
> > > won't
> > > > clean up the working directory. This means that we probably also
> need a
> > > > graceful way for shutting TaskManager processes down in the future
> > > because
> > > > right now they are in most cases killed in order to shut them down.
> If
> > > the
> > > > user uses the tmp directory, then any left-over working directories
> > will
> > > be
> > > > cleaned up with the next system restart. This is somewhat similar to
> > how
> > > > RocksDB's working directory is currently cleaned up as well.
> > > >
> > > > ### Corrupted files
> > > >
> > > > The working directory itself won't give you any guarantees. It will
> be
> > > the
> > > > responsibility of the component that uses the working directory to
> make
> > > > sure that it can deal with corrupted files. E.g. if the component
> > cannot
> > > > read the file, then it should delete it and fall back to the remote
> > > > storage/ground truth to retrieve the required information.
> > > >
> > > > I hope this could answer your questions. Let me know if you have more
> > > > feedback.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <li...@gmail.com>
> wrote:
> > > >
> > > >> I like the idea. It can reuse the disk to do many things. Isn't it
> > only
> > > >> for inner failover? If not, the cleaning may be a problem. Also,
> many
> > > >> resource components have their own disk schedule strategy.
> > > >>
> > > >> Chesnay Schepler <ch...@apache.org> 于2021年12月12日周日 19:59写道:
> > > >>
> > > >>> How do you intend to handle corrupted files, in particular due to
> > > >>> process crashes during a write?
> > > >>> Will all writes to a cached directory append some suffix (e.g.,
> > > >>> ".pending") and do a rename?
> > > >>>
> > > >>> On 10/12/2021 17:54, Till Rohrmann wrote:
> > > >>>> Hi everyone,
> > > >>>>
> > > >>>> I would like to start a discussion about introducing an explicit
> > > working
> > > >>>> directory for Flink processes that can be used to store
> information
> > > [1].
> > > >>>> Per default this working directory will reside in the temporary
> > > >>> directory
> > > >>>> of the node Flink runs on. However, if configured to reside on a
> > > >>> persistent
> > > >>>> volume, then this information can be used to recover from
> > process/node
> > > >>>> failures. Moreover, such a working directory can be used to
> > > consolidate
> > > >>>> some of our other directories Flink creates under /tmp (e.g.
> > > >>> blobStorage,
> > > >>>> RocksDB working directory).
> > > >>>>
> > > >>>> Here is a draft PR that outlines the required changes [2].
> > > >>>>
> > > >>>> Looking forward to your feedback.
> > > >>>>
> > > >>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw
> > > >>>> [2] https://github.com/apache/flink/pull/18083
> > > >>>>
> > > >>>> Cheers,
> > > >>>> Till
> > > >>>>
> > > >>>
> > >
> > >
> >
>

Re: [DISCUSS] FLIP-198: Working directory for Flink processes

Posted by Yang Wang <da...@gmail.com>.
I am afraid creating a dedicated StatefulSet for each TaskManager is too
expensive and using a shared StatefulSet for all
the TaskManagers is not flexible enough. Maybe setting a proper restart
policy for TaskManager pods could benefit from
this FLIP. But we might need to tackle some other issues, e.g. duplicated
registration, etc.

All in all, this is out of the scope of this FLIP. I agree we could leave
it in the future FLIPs.

I have no more concerns. +1


Best,
Yang

Till Rohrmann <tr...@apache.org> 于2021年12月15日周三 19:06写道:

> This is true. But this is not a new problem and I think that Flink should
> be susceptible to this problem already. One solution for this concrete case
> could be that the BlobServer stores some checksums and validates the file
> before serving it to the TM.
>
> Cheers,
> Till
>
> On Wed, Dec 15, 2021 at 11:59 AM Chesnay Schepler <ch...@apache.org>
> wrote:
>
> > The issue with corrupted files is that some of them aren't read by the
> > component that stores them.
> > For example, a file can be corrupted in the blob server of the JM, but
> > that it is corrupted will only be noticed by the TaskExecutor.
> >
> > On 15/12/2021 11:36, Till Rohrmann wrote:
> > > Thanks everyone for your feedback. Let me try to address it by grouping
> > > some of the individual comments:
> > >
> > > ### Will this feature work for native Yarn and K8s deployments?
> > >
> > > The working directory is an optional feature that can be used to
> recover
> > > additional information. You can think of it like a cache. If the
> working
> > > directory is there, then Flink can do certain things a bit faster but
> in
> > > the worst case it will have to retrieve the required information from
> the
> > > JobManager or persistent storage.
> > >
> > > In order to make it work with native Yarn and K8s, we would have to
> > change
> > > these modes slightly. First of all, we would have to be able to map
> > working
> > > directories to processes and then set a deterministic resource ids for
> > the
> > > processes. For K8s this could be easily achievable by using a
> StatefulSet
> > > as the deployment mechanism for TaskExecutors. For Yarn, we probably
> > would
> > > have to remember the prior locations of a process. Both things are
> > > potential follow ups that I don't want to tackle in this FLIP.
> > >
> > > If one of the modes configures the working directory to be on a full or
> > > broken disk, then the process will fail. I think this is not all that
> > > different from the current state where some things in Flink will fail
> if
> > > they picked the wrong/full temporary directory (e.g. blob storage
> > > directory).
> > >
> > > ### Cleanup
> > >
> > > The working directory will be cleaned up if the Flink process is
> > gracefully
> > > shut down. This means that the JobManager process will clean it up if
> it
> > > runs in application mode and the job is terminated. SIGTERM and SIGKILL
> > > signals will be treated as an ungraceful shutdown and therefore they
> > won't
> > > clean up the working directory. This means that we probably also need a
> > > graceful way for shutting TaskManager processes down in the future
> > because
> > > right now they are in most cases killed in order to shut them down. If
> > the
> > > user uses the tmp directory, then any left-over working directories
> will
> > be
> > > cleaned up with the next system restart. This is somewhat similar to
> how
> > > RocksDB's working directory is currently cleaned up as well.
> > >
> > > ### Corrupted files
> > >
> > > The working directory itself won't give you any guarantees. It will be
> > the
> > > responsibility of the component that uses the working directory to make
> > > sure that it can deal with corrupted files. E.g. if the component
> cannot
> > > read the file, then it should delete it and fall back to the remote
> > > storage/ground truth to retrieve the required information.
> > >
> > > I hope this could answer your questions. Let me know if you have more
> > > feedback.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <li...@gmail.com> wrote:
> > >
> > >> I like the idea. It can reuse the disk to do many things. Isn't it
> only
> > >> for inner failover? If not, the cleaning may be a problem. Also, many
> > >> resource components have their own disk schedule strategy.
> > >>
> > >> Chesnay Schepler <ch...@apache.org> 于2021年12月12日周日 19:59写道:
> > >>
> > >>> How do you intend to handle corrupted files, in particular due to
> > >>> process crashes during a write?
> > >>> Will all writes to a cached directory append some suffix (e.g.,
> > >>> ".pending") and do a rename?
> > >>>
> > >>> On 10/12/2021 17:54, Till Rohrmann wrote:
> > >>>> Hi everyone,
> > >>>>
> > >>>> I would like to start a discussion about introducing an explicit
> > working
> > >>>> directory for Flink processes that can be used to store information
> > [1].
> > >>>> Per default this working directory will reside in the temporary
> > >>> directory
> > >>>> of the node Flink runs on. However, if configured to reside on a
> > >>> persistent
> > >>>> volume, then this information can be used to recover from
> process/node
> > >>>> failures. Moreover, such a working directory can be used to
> > consolidate
> > >>>> some of our other directories Flink creates under /tmp (e.g.
> > >>> blobStorage,
> > >>>> RocksDB working directory).
> > >>>>
> > >>>> Here is a draft PR that outlines the required changes [2].
> > >>>>
> > >>>> Looking forward to your feedback.
> > >>>>
> > >>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw
> > >>>> [2] https://github.com/apache/flink/pull/18083
> > >>>>
> > >>>> Cheers,
> > >>>> Till
> > >>>>
> > >>>
> >
> >
>

Re: [DISCUSS] FLIP-198: Working directory for Flink processes

Posted by Till Rohrmann <tr...@apache.org>.
This is true. But this is not a new problem and I think that Flink should
be susceptible to this problem already. One solution for this concrete case
could be that the BlobServer stores some checksums and validates the file
before serving it to the TM.

Cheers,
Till

On Wed, Dec 15, 2021 at 11:59 AM Chesnay Schepler <ch...@apache.org>
wrote:

> The issue with corrupted files is that some of them aren't read by the
> component that stores them.
> For example, a file can be corrupted in the blob server of the JM, but
> that it is corrupted will only be noticed by the TaskExecutor.
>
> On 15/12/2021 11:36, Till Rohrmann wrote:
> > Thanks everyone for your feedback. Let me try to address it by grouping
> > some of the individual comments:
> >
> > ### Will this feature work for native Yarn and K8s deployments?
> >
> > The working directory is an optional feature that can be used to recover
> > additional information. You can think of it like a cache. If the working
> > directory is there, then Flink can do certain things a bit faster but in
> > the worst case it will have to retrieve the required information from the
> > JobManager or persistent storage.
> >
> > In order to make it work with native Yarn and K8s, we would have to
> change
> > these modes slightly. First of all, we would have to be able to map
> working
> > directories to processes and then set a deterministic resource ids for
> the
> > processes. For K8s this could be easily achievable by using a StatefulSet
> > as the deployment mechanism for TaskExecutors. For Yarn, we probably
> would
> > have to remember the prior locations of a process. Both things are
> > potential follow ups that I don't want to tackle in this FLIP.
> >
> > If one of the modes configures the working directory to be on a full or
> > broken disk, then the process will fail. I think this is not all that
> > different from the current state where some things in Flink will fail if
> > they picked the wrong/full temporary directory (e.g. blob storage
> > directory).
> >
> > ### Cleanup
> >
> > The working directory will be cleaned up if the Flink process is
> gracefully
> > shut down. This means that the JobManager process will clean it up if it
> > runs in application mode and the job is terminated. SIGTERM and SIGKILL
> > signals will be treated as an ungraceful shutdown and therefore they
> won't
> > clean up the working directory. This means that we probably also need a
> > graceful way for shutting TaskManager processes down in the future
> because
> > right now they are in most cases killed in order to shut them down. If
> the
> > user uses the tmp directory, then any left-over working directories will
> be
> > cleaned up with the next system restart. This is somewhat similar to how
> > RocksDB's working directory is currently cleaned up as well.
> >
> > ### Corrupted files
> >
> > The working directory itself won't give you any guarantees. It will be
> the
> > responsibility of the component that uses the working directory to make
> > sure that it can deal with corrupted files. E.g. if the component cannot
> > read the file, then it should delete it and fall back to the remote
> > storage/ground truth to retrieve the required information.
> >
> > I hope this could answer your questions. Let me know if you have more
> > feedback.
> >
> > Cheers,
> > Till
> >
> > On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <li...@gmail.com> wrote:
> >
> >> I like the idea. It can reuse the disk to do many things. Isn't it only
> >> for inner failover? If not, the cleaning may be a problem. Also, many
> >> resource components have their own disk schedule strategy.
> >>
> >> Chesnay Schepler <ch...@apache.org> 于2021年12月12日周日 19:59写道:
> >>
> >>> How do you intend to handle corrupted files, in particular due to
> >>> process crashes during a write?
> >>> Will all writes to a cached directory append some suffix (e.g.,
> >>> ".pending") and do a rename?
> >>>
> >>> On 10/12/2021 17:54, Till Rohrmann wrote:
> >>>> Hi everyone,
> >>>>
> >>>> I would like to start a discussion about introducing an explicit
> working
> >>>> directory for Flink processes that can be used to store information
> [1].
> >>>> Per default this working directory will reside in the temporary
> >>> directory
> >>>> of the node Flink runs on. However, if configured to reside on a
> >>> persistent
> >>>> volume, then this information can be used to recover from process/node
> >>>> failures. Moreover, such a working directory can be used to
> consolidate
> >>>> some of our other directories Flink creates under /tmp (e.g.
> >>> blobStorage,
> >>>> RocksDB working directory).
> >>>>
> >>>> Here is a draft PR that outlines the required changes [2].
> >>>>
> >>>> Looking forward to your feedback.
> >>>>
> >>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw
> >>>> [2] https://github.com/apache/flink/pull/18083
> >>>>
> >>>> Cheers,
> >>>> Till
> >>>>
> >>>
>
>

Re: [DISCUSS] FLIP-198: Working directory for Flink processes

Posted by Chesnay Schepler <ch...@apache.org>.
The issue with corrupted files is that some of them aren't read by the 
component that stores them.
For example, a file can be corrupted in the blob server of the JM, but 
that it is corrupted will only be noticed by the TaskExecutor.

On 15/12/2021 11:36, Till Rohrmann wrote:
> Thanks everyone for your feedback. Let me try to address it by grouping
> some of the individual comments:
>
> ### Will this feature work for native Yarn and K8s deployments?
>
> The working directory is an optional feature that can be used to recover
> additional information. You can think of it like a cache. If the working
> directory is there, then Flink can do certain things a bit faster but in
> the worst case it will have to retrieve the required information from the
> JobManager or persistent storage.
>
> In order to make it work with native Yarn and K8s, we would have to change
> these modes slightly. First of all, we would have to be able to map working
> directories to processes and then set a deterministic resource ids for the
> processes. For K8s this could be easily achievable by using a StatefulSet
> as the deployment mechanism for TaskExecutors. For Yarn, we probably would
> have to remember the prior locations of a process. Both things are
> potential follow ups that I don't want to tackle in this FLIP.
>
> If one of the modes configures the working directory to be on a full or
> broken disk, then the process will fail. I think this is not all that
> different from the current state where some things in Flink will fail if
> they picked the wrong/full temporary directory (e.g. blob storage
> directory).
>
> ### Cleanup
>
> The working directory will be cleaned up if the Flink process is gracefully
> shut down. This means that the JobManager process will clean it up if it
> runs in application mode and the job is terminated. SIGTERM and SIGKILL
> signals will be treated as an ungraceful shutdown and therefore they won't
> clean up the working directory. This means that we probably also need a
> graceful way for shutting TaskManager processes down in the future because
> right now they are in most cases killed in order to shut them down. If the
> user uses the tmp directory, then any left-over working directories will be
> cleaned up with the next system restart. This is somewhat similar to how
> RocksDB's working directory is currently cleaned up as well.
>
> ### Corrupted files
>
> The working directory itself won't give you any guarantees. It will be the
> responsibility of the component that uses the working directory to make
> sure that it can deal with corrupted files. E.g. if the component cannot
> read the file, then it should delete it and fall back to the remote
> storage/ground truth to retrieve the required information.
>
> I hope this could answer your questions. Let me know if you have more
> feedback.
>
> Cheers,
> Till
>
> On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <li...@gmail.com> wrote:
>
>> I like the idea. It can reuse the disk to do many things. Isn't it only
>> for inner failover? If not, the cleaning may be a problem. Also, many
>> resource components have their own disk schedule strategy.
>>
>> Chesnay Schepler <ch...@apache.org> 于2021年12月12日周日 19:59写道:
>>
>>> How do you intend to handle corrupted files, in particular due to
>>> process crashes during a write?
>>> Will all writes to a cached directory append some suffix (e.g.,
>>> ".pending") and do a rename?
>>>
>>> On 10/12/2021 17:54, Till Rohrmann wrote:
>>>> Hi everyone,
>>>>
>>>> I would like to start a discussion about introducing an explicit working
>>>> directory for Flink processes that can be used to store information [1].
>>>> Per default this working directory will reside in the temporary
>>> directory
>>>> of the node Flink runs on. However, if configured to reside on a
>>> persistent
>>>> volume, then this information can be used to recover from process/node
>>>> failures. Moreover, such a working directory can be used to consolidate
>>>> some of our other directories Flink creates under /tmp (e.g.
>>> blobStorage,
>>>> RocksDB working directory).
>>>>
>>>> Here is a draft PR that outlines the required changes [2].
>>>>
>>>> Looking forward to your feedback.
>>>>
>>>> [1] https://cwiki.apache.org/confluence/x/ZZiqCw
>>>> [2] https://github.com/apache/flink/pull/18083
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>


Re: [DISCUSS] FLIP-198: Working directory for Flink processes

Posted by Till Rohrmann <tr...@apache.org>.
Thanks everyone for your feedback. Let me try to address it by grouping
some of the individual comments:

### Will this feature work for native Yarn and K8s deployments?

The working directory is an optional feature that can be used to recover
additional information. You can think of it like a cache. If the working
directory is there, then Flink can do certain things a bit faster but in
the worst case it will have to retrieve the required information from the
JobManager or persistent storage.

In order to make it work with native Yarn and K8s, we would have to change
these modes slightly. First of all, we would have to be able to map working
directories to processes and then set a deterministic resource ids for the
processes. For K8s this could be easily achievable by using a StatefulSet
as the deployment mechanism for TaskExecutors. For Yarn, we probably would
have to remember the prior locations of a process. Both things are
potential follow ups that I don't want to tackle in this FLIP.

If one of the modes configures the working directory to be on a full or
broken disk, then the process will fail. I think this is not all that
different from the current state where some things in Flink will fail if
they picked the wrong/full temporary directory (e.g. blob storage
directory).

### Cleanup

The working directory will be cleaned up if the Flink process is gracefully
shut down. This means that the JobManager process will clean it up if it
runs in application mode and the job is terminated. SIGTERM and SIGKILL
signals will be treated as an ungraceful shutdown and therefore they won't
clean up the working directory. This means that we probably also need a
graceful way for shutting TaskManager processes down in the future because
right now they are in most cases killed in order to shut them down. If the
user uses the tmp directory, then any left-over working directories will be
cleaned up with the next system restart. This is somewhat similar to how
RocksDB's working directory is currently cleaned up as well.

### Corrupted files

The working directory itself won't give you any guarantees. It will be the
responsibility of the component that uses the working directory to make
sure that it can deal with corrupted files. E.g. if the component cannot
read the file, then it should delete it and fall back to the remote
storage/ground truth to retrieve the required information.

I hope this could answer your questions. Let me know if you have more
feedback.

Cheers,
Till

On Mon, Dec 13, 2021 at 5:05 AM 刘建刚 <li...@gmail.com> wrote:

> I like the idea. It can reuse the disk to do many things. Isn't it only
> for inner failover? If not, the cleaning may be a problem. Also, many
> resource components have their own disk schedule strategy.
>
> Chesnay Schepler <ch...@apache.org> 于2021年12月12日周日 19:59写道:
>
>> How do you intend to handle corrupted files, in particular due to
>> process crashes during a write?
>> Will all writes to a cached directory append some suffix (e.g.,
>> ".pending") and do a rename?
>>
>> On 10/12/2021 17:54, Till Rohrmann wrote:
>> > Hi everyone,
>> >
>> > I would like to start a discussion about introducing an explicit working
>> > directory for Flink processes that can be used to store information [1].
>> > Per default this working directory will reside in the temporary
>> directory
>> > of the node Flink runs on. However, if configured to reside on a
>> persistent
>> > volume, then this information can be used to recover from process/node
>> > failures. Moreover, such a working directory can be used to consolidate
>> > some of our other directories Flink creates under /tmp (e.g.
>> blobStorage,
>> > RocksDB working directory).
>> >
>> > Here is a draft PR that outlines the required changes [2].
>> >
>> > Looking forward to your feedback.
>> >
>> > [1] https://cwiki.apache.org/confluence/x/ZZiqCw
>> > [2] https://github.com/apache/flink/pull/18083
>> >
>> > Cheers,
>> > Till
>> >
>>
>>

Re: [DISCUSS] FLIP-198: Working directory for Flink processes

Posted by 刘建刚 <li...@gmail.com>.
I like the idea. It can reuse the disk to do many things. Isn't it only for
inner failover? If not, the cleaning may be a problem. Also, many resource
components have their own disk schedule strategy.

Chesnay Schepler <ch...@apache.org> 于2021年12月12日周日 19:59写道:

> How do you intend to handle corrupted files, in particular due to
> process crashes during a write?
> Will all writes to a cached directory append some suffix (e.g.,
> ".pending") and do a rename?
>
> On 10/12/2021 17:54, Till Rohrmann wrote:
> > Hi everyone,
> >
> > I would like to start a discussion about introducing an explicit working
> > directory for Flink processes that can be used to store information [1].
> > Per default this working directory will reside in the temporary directory
> > of the node Flink runs on. However, if configured to reside on a
> persistent
> > volume, then this information can be used to recover from process/node
> > failures. Moreover, such a working directory can be used to consolidate
> > some of our other directories Flink creates under /tmp (e.g. blobStorage,
> > RocksDB working directory).
> >
> > Here is a draft PR that outlines the required changes [2].
> >
> > Looking forward to your feedback.
> >
> > [1] https://cwiki.apache.org/confluence/x/ZZiqCw
> > [2] https://github.com/apache/flink/pull/18083
> >
> > Cheers,
> > Till
> >
>
>

Re: [DISCUSS] FLIP-198: Working directory for Flink processes

Posted by Chesnay Schepler <ch...@apache.org>.
How do you intend to handle corrupted files, in particular due to 
process crashes during a write?
Will all writes to a cached directory append some suffix (e.g., 
".pending") and do a rename?

On 10/12/2021 17:54, Till Rohrmann wrote:
> Hi everyone,
>
> I would like to start a discussion about introducing an explicit working
> directory for Flink processes that can be used to store information [1].
> Per default this working directory will reside in the temporary directory
> of the node Flink runs on. However, if configured to reside on a persistent
> volume, then this information can be used to recover from process/node
> failures. Moreover, such a working directory can be used to consolidate
> some of our other directories Flink creates under /tmp (e.g. blobStorage,
> RocksDB working directory).
>
> Here is a draft PR that outlines the required changes [2].
>
> Looking forward to your feedback.
>
> [1] https://cwiki.apache.org/confluence/x/ZZiqCw
> [2] https://github.com/apache/flink/pull/18083
>
> Cheers,
> Till
>


Re: [DISCUSS] FLIP-198: Working directory for Flink processes

Posted by Yang Wang <da...@gmail.com>.
Thanks Till for creating this FLIP.

I believe the feature is really useful for standalone K8s deployment with
persist volume. For native K8s and Yarn deployment,
Flink ResourceManager will create a new TaskManager with new resource id.
So we still could not benefit from this FLIP.

Moreover, I am curious about the clean-up mechanism. Will the working
directory be deleted once the Flink job reached
globally terminal state? Or it needs to be deleted externally.


Best,
Yang


Yun Tang <my...@live.com> 于2021年12月11日周六 下午10:48写道:

> Hi Till,
>
> Thanks for driving this topic. I think this FLIP is very important to let
> us could enable local recovery [1] by default.
>
> We previously also took similar method to make the working directory to
> let local state dir as the same as state-backend's local dir to ensure
> local recovery could well.
>
> I noticed that this FLIP also want to make the working directory the same
> even process failure so that restarted processor could also take the old
> one. However, I think there might exist some problems in YARN environment.
> YARN would select all the local directories on different disks as the
> 'LOCAL_DIRS' to represent the "io.tmp.dirs" [2]. To allow the reuse of same
> old working directory, we need to always select the same directory from all
> disk candidates for the specific resource. Thus, we might need to store the
> working directory location persistently. If we use hash or similar method
> to calculate which directory would always be used as the working directory
> for specific 'resource id', it might meet problem if one of the disks is
> temporarily full or broken.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-15507
> [2]
> https://github.com/apache/flink/blob/cf1e8c39111378735e4c05a5edb3bd713229bb08/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java#L363
>
> Best
> Yun Tang
> ________________________________
> From: Till Rohrmann <tr...@apache.org>
> Sent: Saturday, December 11, 2021 0:54
> To: dev <de...@flink.apache.org>
> Subject: [DISCUSS] FLIP-198: Working directory for Flink processes
>
> Hi everyone,
>
> I would like to start a discussion about introducing an explicit working
> directory for Flink processes that can be used to store information [1].
> Per default this working directory will reside in the temporary directory
> of the node Flink runs on. However, if configured to reside on a persistent
> volume, then this information can be used to recover from process/node
> failures. Moreover, such a working directory can be used to consolidate
> some of our other directories Flink creates under /tmp (e.g. blobStorage,
> RocksDB working directory).
>
> Here is a draft PR that outlines the required changes [2].
>
> Looking forward to your feedback.
>
> [1] https://cwiki.apache.org/confluence/x/ZZiqCw
> [2] https://github.com/apache/flink/pull/18083
>
> Cheers,
> Till
>

Re: [DISCUSS] FLIP-198: Working directory for Flink processes

Posted by Yun Tang <my...@live.com>.
Hi Till,

Thanks for driving this topic. I think this FLIP is very important to let us could enable local recovery [1] by default.

We previously also took similar method to make the working directory to let local state dir as the same as state-backend's local dir to ensure local recovery could well.

I noticed that this FLIP also want to make the working directory the same even process failure so that restarted processor could also take the old one. However, I think there might exist some problems in YARN environment. YARN would select all the local directories on different disks as the 'LOCAL_DIRS' to represent the "io.tmp.dirs" [2]. To allow the reuse of same old working directory, we need to always select the same directory from all disk candidates for the specific resource. Thus, we might need to store the working directory location persistently. If we use hash or similar method to calculate which directory would always be used as the working directory for specific 'resource id', it might meet problem if one of the disks is temporarily full or broken.



[1] https://issues.apache.org/jira/browse/FLINK-15507
[2] https://github.com/apache/flink/blob/cf1e8c39111378735e4c05a5edb3bd713229bb08/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java#L363

Best
Yun Tang
________________________________
From: Till Rohrmann <tr...@apache.org>
Sent: Saturday, December 11, 2021 0:54
To: dev <de...@flink.apache.org>
Subject: [DISCUSS] FLIP-198: Working directory for Flink processes

Hi everyone,

I would like to start a discussion about introducing an explicit working
directory for Flink processes that can be used to store information [1].
Per default this working directory will reside in the temporary directory
of the node Flink runs on. However, if configured to reside on a persistent
volume, then this information can be used to recover from process/node
failures. Moreover, such a working directory can be used to consolidate
some of our other directories Flink creates under /tmp (e.g. blobStorage,
RocksDB working directory).

Here is a draft PR that outlines the required changes [2].

Looking forward to your feedback.

[1] https://cwiki.apache.org/confluence/x/ZZiqCw
[2] https://github.com/apache/flink/pull/18083

Cheers,
Till