You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Boyuan Zhang <bo...@apache.org> on 2020/09/28 19:44:23 UTC

How to clean up resources in a UDF?

Hi team,

I'm building a UDF by implementing AbstractRichFunction, where I want to do
some resource cleanup per input element when the processing result is
committed. I can perform such cleanup in streaming by implementing
*CheckpointListener.notifyCheckpointComplete() *but it seems like there is
no checkpoint mechanism in batch processing.
I'm wondering is* AbstractRichFunction.close() *the good place to do so?
How does flink deal with fault tolerance in batch?

Thanks for your help!

Re: How to clean up resources in a UDF?

Posted by Boyuan Zhang <bo...@apache.org>.
Thanks, Aljoscha! The context is really helpful!

On Fri, Oct 2, 2020 at 1:19 AM Aljoscha Krettek <al...@apache.org> wrote:

> Unfortunately, there is no such hook right now. However, we're working
> on this in the context of FLIP-134 [1] and FLIP-143 [2].
>
> Best,
> Aljoscha
>
> [1] https://cwiki.apache.org/confluence/x/4i94CQ
> [2] https://cwiki.apache.org/confluence/x/KEJ4CQ
>
> On 01.10.20 20:35, Boyuan Zhang wrote:
> > Thanks, Aljoscha! That's really helpful.
> >
> > I think I only want to do my cleanup when the task successfully finishes,
> > which means the cleanup should only be invoked when the task is
> > guaranteed not to be executed again in one given batch execution. Is
> there
> > any way to do so?
> >
> > Thanks for your help!
> >
> > On Thu, Oct 1, 2020 at 2:55 AM Aljoscha Krettek <al...@apache.org>
> wrote:
> >
> >> Hi!
> >>
> >> Yes, AbstractRichFunction.close() would be the right place to do
> >> cleanup. This method is called both in case of successful finishing and
> >> also in the case of failures.
> >>
> >> For BATCH execution, Flink will do backtracking upwards from the failed
> >> task(s) to see if intermediate results from previous tasks are still
> >> available. If they are available, computation can restart from there.
> >> Otherwise the whole job will have to be restarted.
> >>
> >> Best,
> >> Aljoscha
> >>
> >> On 28.09.20 21:44, Boyuan Zhang wrote:
> >>> Hi team,
> >>>
> >>> I'm building a UDF by implementing AbstractRichFunction, where I want
> to
> >> do
> >>> some resource cleanup per input element when the processing result is
> >>> committed. I can perform such cleanup in streaming by implementing
> >>> *CheckpointListener.notifyCheckpointComplete() *but it seems like there
> >> is
> >>> no checkpoint mechanism in batch processing.
> >>> I'm wondering is* AbstractRichFunction.close() *the good place to do
> so?
> >>> How does flink deal with fault tolerance in batch?
> >>>
> >>> Thanks for your help!
> >>>
> >>
> >>
> >
>
>

Re: How to clean up resources in a UDF?

Posted by Aljoscha Krettek <al...@apache.org>.
Unfortunately, there is no such hook right now. However, we're working 
on this in the context of FLIP-134 [1] and FLIP-143 [2].

Best,
Aljoscha

[1] https://cwiki.apache.org/confluence/x/4i94CQ
[2] https://cwiki.apache.org/confluence/x/KEJ4CQ

On 01.10.20 20:35, Boyuan Zhang wrote:
> Thanks, Aljoscha! That's really helpful.
> 
> I think I only want to do my cleanup when the task successfully finishes,
> which means the cleanup should only be invoked when the task is
> guaranteed not to be executed again in one given batch execution. Is there
> any way to do so?
> 
> Thanks for your help!
> 
> On Thu, Oct 1, 2020 at 2:55 AM Aljoscha Krettek <al...@apache.org> wrote:
> 
>> Hi!
>>
>> Yes, AbstractRichFunction.close() would be the right place to do
>> cleanup. This method is called both in case of successful finishing and
>> also in the case of failures.
>>
>> For BATCH execution, Flink will do backtracking upwards from the failed
>> task(s) to see if intermediate results from previous tasks are still
>> available. If they are available, computation can restart from there.
>> Otherwise the whole job will have to be restarted.
>>
>> Best,
>> Aljoscha
>>
>> On 28.09.20 21:44, Boyuan Zhang wrote:
>>> Hi team,
>>>
>>> I'm building a UDF by implementing AbstractRichFunction, where I want to
>> do
>>> some resource cleanup per input element when the processing result is
>>> committed. I can perform such cleanup in streaming by implementing
>>> *CheckpointListener.notifyCheckpointComplete() *but it seems like there
>> is
>>> no checkpoint mechanism in batch processing.
>>> I'm wondering is* AbstractRichFunction.close() *the good place to do so?
>>> How does flink deal with fault tolerance in batch?
>>>
>>> Thanks for your help!
>>>
>>
>>
> 


Re: How to clean up resources in a UDF?

Posted by Boyuan Zhang <bo...@apache.org>.
Thanks, Aljoscha! That's really helpful.

I think I only want to do my cleanup when the task successfully finishes,
which means the cleanup should only be invoked when the task is
guaranteed not to be executed again in one given batch execution. Is there
any way to do so?

Thanks for your help!

On Thu, Oct 1, 2020 at 2:55 AM Aljoscha Krettek <al...@apache.org> wrote:

> Hi!
>
> Yes, AbstractRichFunction.close() would be the right place to do
> cleanup. This method is called both in case of successful finishing and
> also in the case of failures.
>
> For BATCH execution, Flink will do backtracking upwards from the failed
> task(s) to see if intermediate results from previous tasks are still
> available. If they are available, computation can restart from there.
> Otherwise the whole job will have to be restarted.
>
> Best,
> Aljoscha
>
> On 28.09.20 21:44, Boyuan Zhang wrote:
> > Hi team,
> >
> > I'm building a UDF by implementing AbstractRichFunction, where I want to
> do
> > some resource cleanup per input element when the processing result is
> > committed. I can perform such cleanup in streaming by implementing
> > *CheckpointListener.notifyCheckpointComplete() *but it seems like there
> is
> > no checkpoint mechanism in batch processing.
> > I'm wondering is* AbstractRichFunction.close() *the good place to do so?
> > How does flink deal with fault tolerance in batch?
> >
> > Thanks for your help!
> >
>
>

Re: How to clean up resources in a UDF?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi!

Yes, AbstractRichFunction.close() would be the right place to do 
cleanup. This method is called both in case of successful finishing and 
also in the case of failures.

For BATCH execution, Flink will do backtracking upwards from the failed 
task(s) to see if intermediate results from previous tasks are still 
available. If they are available, computation can restart from there. 
Otherwise the whole job will have to be restarted.

Best,
Aljoscha

On 28.09.20 21:44, Boyuan Zhang wrote:
> Hi team,
> 
> I'm building a UDF by implementing AbstractRichFunction, where I want to do
> some resource cleanup per input element when the processing result is
> committed. I can perform such cleanup in streaming by implementing
> *CheckpointListener.notifyCheckpointComplete() *but it seems like there is
> no checkpoint mechanism in batch processing.
> I'm wondering is* AbstractRichFunction.close() *the good place to do so?
> How does flink deal with fault tolerance in batch?
> 
> Thanks for your help!
>