You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Lincoln Lee <li...@gmail.com> on 2022/09/14 06:21:19 UTC

[DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction

Hello everyone,

  I’d like to open a discussion on FLIP-260[1]: expose finish method for
UserDefinedFunction, this makes a chance for users who rely on finish logic
in the legacy close() method (< 1.14) to migrate to the new finish() method.

  The task lifecycle was changed in FLINK-22972[2]: a new finish() phase
was introduced (extracted the ‘finish’ part out of the ‘close’) and removed
the dispose() method. This change was also done in table module (e.g.,
`AbstractMapBundleOperator` for mini-batch operation ) but not covered the
UserDefinedFunction which only exposes open() and close() api for custom
usage, those customers who rely on the legacy close() api may encounter
wrong result or suffer runtime errors after upgrading to the new version.
Strictly speaking, it is a bug caused by the breaking change, but due to
the public api change, we propose this flip.

  Looking forward to your comments or feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
[2] https://issues.apache.org/jira/browse/FLINK-22972

Best,
Lincoln Lee

Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction

Posted by Lincoln Lee <li...@gmail.com>.
Hi everyone,

Thanks for all your feedback! I started a vote for this FLIP [1], please
vote there or ask additional questions here[2].

[1] https://lists.apache.org/thread/nr9wwf98fkw1tk7ycgbcfjjo5g4x8pmz
[2] https://lists.apache.org/thread/m9hj60p3mntyctkbxrksm8l4d0s4q9dw

Best,
Lincoln Lee


Piotr Nowojski <pn...@apache.org> 于2022年9月20日周二 18:21写道:

> Fine by me. Thanks for driving this Lincoln :)
>
> Best, Piotrek
>
> wt., 20 wrz 2022 o 09:06 Lincoln Lee <li...@gmail.com> napisał(a):
>
> > Hi all,
> >    I'll start a vote if there are no more objections till this
> > thursday(9.22). Looking forward to your feedback!
> >
> > [1] Flip-260:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
> > [2] PoC: https://github.com/lincoln-lil/flink/tree/tf-finish-poc
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Lincoln Lee <li...@gmail.com> 于2022年9月19日周一 17:38写道:
> >
> > > Hi Jingsong,
> > >    Thank you for participating this discussion!  For the method name, I
> > > think we should follow the new finish() method in `StreamOperator`,
> the
> > > BoundedOneInput might be removed in the future as discussed [1] before
> > >
> > > [1] https://lists.apache.org/thread/3ozw653ql8jso9w55p4pw8p4909trvkb
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Jingsong Li <ji...@gmail.com> 于2022年9月19日周一 10:13写道:
> > >
> > >> +1 to add `finish()` method to `TableFunction` only.
> > >>
> > >> Can we use `endInput` just like `BoundedOneInput`?
> > >>
> > >> Best,
> > >> Jingsong
> > >>
> > >> On Fri, Sep 16, 2022 at 11:54 PM Lincoln Lee <li...@gmail.com>
> > >> wrote:
> > >> >
> > >> > Hi Dawid, Piotr,
> > >> >    Agree with you that add finish() method to `TableFunction` only.
> > >> Other
> > >> > `UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`,
> > >> > `AggregateTableFunction`) are not necessarily to have the finish
> > >> > method(they can not emit records in legacy close() method).
> > >> >
> > >> > A `TableFunction` is used to correlate with the left table/stream,
> the
> > >> > following example shows a case that user only select columns from
> the
> > >> > correlated 'FeatureTF' (no left table column was selected):
> > >> > ```
> > >> > SELECT feature1, feature2, feature3
> > >> > FROM MyTable t1
> > >> > JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2,
> > >> > feature3) ON TRUE
> > >> > ```
> > >> > the 'FeatureTF' can do some flushing work in legacy close() method
> and
> > >> this
> > >> > doesn't break any sql semantics, so I don't see any reason that we
> can
> > >> > enforce users not do flushing work in new finish() method. I've
> > updated
> > >> the
> > >> > flip doc to limit the change only for `TableFunction`[1].
> > >> >
> > >> > For the more powerful `ProcessFunction`, I'd like to share some
> > >> thoughts:
> > >> > There indeed exists requirements for advanced usage in Table/SQL,
> > even a
> > >> > further UD-Operator, e.g., UD-Join for user controlled join logic
> > which
> > >> can
> > >> > not simply expressed by SQL. This is an interesting topic, expect
> more
> > >> > discussions on this.
> > >> >
> > >> >
> > >> > [1]
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
> > >> >
> > >> > Best,
> > >> > Lincoln Lee
> > >> >
> > >> >
> > >> > Piotr Nowojski <pn...@apache.org> 于2022年9月15日周四 22:39写道:
> > >> >
> > >> > > Hi Dawid, Lincoln,
> > >> > >
> > >> > > I would tend to agree with Dawid. It seems to me like
> > `TableFunction`
> > >> is
> > >> > > the one that needs to be taken care of. Other types of
> > >> > > `UserDefinedFunction` wouldn't be able to emit anything from the
> > >> `finish()`
> > >> > > even if we added it. And if we added `finish(Collector<T> out)` to
> > >> them, it
> > >> > > would create the same problems (how to pass the output type) that
> > >> prevented
> > >> > > us from adding `finish()` to all functions in the DataStream API.
> > >> > >
> > >> > > However I'm not sure what should be the long term solution for the
> > >> Table
> > >> > > API. For the DataStream API we wanted to provide a new, better and
> > >> more
> > >> > > powerful `ProcessFunction` for all of the unusual use cases, that
> > >> currently
> > >> > > require the use of `StreamOperator` API instead of `DataStream`
> > >> functions.
> > >> > > I don't know what would be an alternative in the Table API.
> > >> > >
> > >> > > Dawid, who do you think we should ping from the Table API/SQL
> teams
> > >> to chip
> > >> > > in?
> > >> > >
> > >> > > Best,
> > >> > > Piotrek
> > >> > >
> > >> > > czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz <
> dwysakowicz@apache.org>
> > >> > > napisał(a):
> > >> > >
> > >> > > > Hey Lincoln,
> > >> > > >
> > >> > > > Thanks for opening the discussion.
> > >> > > >
> > >> > > > To be honest I am not convinced if emitting from close there is
> a
> > >> > > > contract that was envisioned and thus should be maintained. As
> far
> > >> as I
> > >> > > > can see it does affect only the TableFunction, because it has
> the
> > >> > > > collect method. None of the other UDFs (ScalarFunction,
> > >> > > > AggregateFunction) have means to emit records from close().
> > >> > > >
> > >> > > > To be honest I am not sure what would be the consequences of
> > >> interplay
> > >> > > > with other operators which expect TableFunction to emit only
> when
> > >> eval
> > >> > > > is called. Not sure if there are such.
> > >> > > >
> > >> > > > If it is a thing that we are certain we want to support, I'd be
> > much
> > >> > > > more comfortable adding finish() to the TableFunction instead.
> > >> Would be
> > >> > > > happy to hear opinions from the Table API folks.
> > >> > > >
> > >> > > > Best,
> > >> > > >
> > >> > > > Dawid
> > >> > > >
> > >> > > > On 14/09/2022 15:55, Lincoln Lee wrote:
> > >> > > > > Thanks @Piort for your valuable inputs!
> > >> > > > >
> > >> > > > > I did a quick read of the previous discussion you mentioned,
> > >> seems my
> > >> > > > flip
> > >> > > > > title doesn't give a clear scope here and make some
> confusions,
> > >> if my
> > >> > > > > understanding is correct, the UDFs in your context is the user
> > >> > > > > implemented `org.apache.flink.api.common.functions.Function`s,
> > >> while
> > >> > > the
> > >> > > > > `UserDefinedFunction` I mentioned in the flip is limited to
> the
> > >> > > > flink-table
> > >> > > > > module which located in `org.apache.flink.table.functions`.
> > >> > > > >
> > >> > > > > Here's an use case we've met recently (which is indeed the
> > >> motivation
> > >> > > to
> > >> > > > > propose this):
> > >> > > > > one of our user implemented a
> > >> > > > > `org.apache.flink.table.functions.TableFunction`, the
> simplified
> > >> > > > > pseudo-code is as below:
> > >> > > > >
> > >> > > > > ```
> > >> > > > > class XFunction extend TableFunction<Out> {
> > >> > > > >
> > >> > > > >    open(FunctionContext context){
> > >> > > > >        initMemQueue();
> > >> > > > >        initPythonProc()
> > >> > > > >    }
> > >> > > > >
> > >> > > > >    eval(In in){
> > >> > > > >        queue.offer(data)
> > >> > > > >        Out out = queue.poll()
> > >> > > > >        if (out != null) {
> > >> > > > >          collect(out)
> > >> > > > >        }
> > >> > > > >    }
> > >> > > > >
> > >> > > > >    close(){
> > >> > > > >        waitForPythonFinish()
> > >> > > > >        List<Out> outputs = drainQueue()
> > >> > > > >        outputs.foreach(out -> collect(out))
> > >> > > > >    }
> > >> > > > > }
> > >> > > > > ```
> > >> > > > > It works well in lower flink versions until they attempt to
> do a
> > >> > > upgrade
> > >> > > > > recently, the 'flush' logic in the legacy close method of
> > >> > > `TableFunction`
> > >> > > > > cannot work properly any more.
> > >> > > > >
> > >> > > > > Before proposing the flip, I also considered the `flush()`
> > >> extension on
> > >> > > > the
> > >> > > > > `org.apache.flink.api.common.functions.Function`, because some
> > sql
> > >> > > > > operators are also related, but currently not included in the
> > >> scope of
> > >> > > > this
> > >> > > > > flip, maybe we can discuss it in another thread.
> > >> > > > >
> > >> > > > > Wish this helps explaining the reason and welcome your
> comments
> > >> here!
> > >> > > > >
> > >> > > > > Best,
> > >> > > > > Lincoln Lee
> > >> > > > >
> > >> > > > >
> > >> > > > > Piotr Nowojski <pn...@apache.org> 于2022年9月14日周三 16:56写道:
> > >> > > > >
> > >> > > > >> Hi Lincoln,
> > >> > > > >>
> > >> > > > >> Thanks for the proposal. Have you seen the old discussion
> about
> > >> adding
> > >> > > > this
> > >> > > > >> `finish()` method? [1] We didn't add it to UDFs, as we didn't
> > >> see a
> > >> > > > >> motivation (maybe we have missed something), and at the same
> > >> time it
> > >> > > > wasn't
> > >> > > > >> that easy. Plain `finish()` wouldn't be enough. Users would
> > need
> > >> a way
> > >> > > > to
> > >> > > > >> output records from the `finish()` call, so it would have to
> be
> > >> typed
> > >> > > > with
> > >> > > > >> the user record (`finish(Collector<T> output)`). On the other
> > >> hand, we
> > >> > > > >> couldn't find an example where a user would actually need the
> > >> > > `finish()`
> > >> > > > >> call in an UDF, as it seemed to us it makes only sense for
> > >> > > > >> operators/functions that are buffering records. Note back
> then,
> > >> during
> > >> > > > the
> > >> > > > >> discussion, we were referring to this method as `flush()` or
> > >> > > `drain()`.
> > >> > > > >>
> > >> > > > >> Can you shed some more light and provide more details on the
> > >> exact
> > >> > > > >> motivating example behind this proposal?
> > >> > > > >>
> > >> > > > >> Best,
> > >> > > > >> Piotrek
> > >> > > > >>
> > >> > > > >> [1]
> > >> https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp
> > >> > > > >>
> > >> > > > >> śr., 14 wrz 2022 o 08:22 Lincoln Lee <lincoln.86xy@gmail.com
> >
> > >> > > > napisał(a):
> > >> > > > >>
> > >> > > > >>> Hello everyone,
> > >> > > > >>>
> > >> > > > >>>    I’d like to open a discussion on FLIP-260[1]: expose
> finish
> > >> method
> > >> > > > for
> > >> > > > >>> UserDefinedFunction, this makes a chance for users who rely
> on
> > >> finish
> > >> > > > >> logic
> > >> > > > >>> in the legacy close() method (< 1.14) to migrate to the new
> > >> finish()
> > >> > > > >>> method.
> > >> > > > >>>
> > >> > > > >>>    The task lifecycle was changed in FLINK-22972[2]: a new
> > >> finish()
> > >> > > > phase
> > >> > > > >>> was introduced (extracted the ‘finish’ part out of the
> > ‘close’)
> > >> and
> > >> > > > >> removed
> > >> > > > >>> the dispose() method. This change was also done in table
> > module
> > >> > > (e.g.,
> > >> > > > >>> `AbstractMapBundleOperator` for mini-batch operation ) but
> not
> > >> > > covered
> > >> > > > >> the
> > >> > > > >>> UserDefinedFunction which only exposes open() and close()
> api
> > >> for
> > >> > > > custom
> > >> > > > >>> usage, those customers who rely on the legacy close() api
> may
> > >> > > encounter
> > >> > > > >>> wrong result or suffer runtime errors after upgrading to the
> > new
> > >> > > > version.
> > >> > > > >>> Strictly speaking, it is a bug caused by the breaking
> change,
> > >> but due
> > >> > > > to
> > >> > > > >>> the public api change, we propose this flip.
> > >> > > > >>>
> > >> > > > >>>    Looking forward to your comments or feedback.
> > >> > > > >>>
> > >> > > > >>> [1]
> > >> > > > >>>
> > >> > > > >>>
> > >> > > > >>
> > >> > > >
> > >> > >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
> > >> > > > >>> [2] https://issues.apache.org/jira/browse/FLINK-22972
> > >> > > > >>>
> > >> > > > >>> Best,
> > >> > > > >>> Lincoln Lee
> > >> > > > >>>
> > >> > > >
> > >> > >
> > >>
> > >
> >
>

Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction

Posted by Piotr Nowojski <pn...@apache.org>.
Fine by me. Thanks for driving this Lincoln :)

Best, Piotrek

wt., 20 wrz 2022 o 09:06 Lincoln Lee <li...@gmail.com> napisał(a):

> Hi all,
>    I'll start a vote if there are no more objections till this
> thursday(9.22). Looking forward to your feedback!
>
> [1] Flip-260:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
> [2] PoC: https://github.com/lincoln-lil/flink/tree/tf-finish-poc
>
> Best,
> Lincoln Lee
>
>
> Lincoln Lee <li...@gmail.com> 于2022年9月19日周一 17:38写道:
>
> > Hi Jingsong,
> >    Thank you for participating this discussion!  For the method name, I
> > think we should follow the new finish() method in `StreamOperator`,  the
> > BoundedOneInput might be removed in the future as discussed [1] before
> >
> > [1] https://lists.apache.org/thread/3ozw653ql8jso9w55p4pw8p4909trvkb
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Jingsong Li <ji...@gmail.com> 于2022年9月19日周一 10:13写道:
> >
> >> +1 to add `finish()` method to `TableFunction` only.
> >>
> >> Can we use `endInput` just like `BoundedOneInput`?
> >>
> >> Best,
> >> Jingsong
> >>
> >> On Fri, Sep 16, 2022 at 11:54 PM Lincoln Lee <li...@gmail.com>
> >> wrote:
> >> >
> >> > Hi Dawid, Piotr,
> >> >    Agree with you that add finish() method to `TableFunction` only.
> >> Other
> >> > `UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`,
> >> > `AggregateTableFunction`) are not necessarily to have the finish
> >> > method(they can not emit records in legacy close() method).
> >> >
> >> > A `TableFunction` is used to correlate with the left table/stream, the
> >> > following example shows a case that user only select columns from the
> >> > correlated 'FeatureTF' (no left table column was selected):
> >> > ```
> >> > SELECT feature1, feature2, feature3
> >> > FROM MyTable t1
> >> > JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2,
> >> > feature3) ON TRUE
> >> > ```
> >> > the 'FeatureTF' can do some flushing work in legacy close() method and
> >> this
> >> > doesn't break any sql semantics, so I don't see any reason that we can
> >> > enforce users not do flushing work in new finish() method. I've
> updated
> >> the
> >> > flip doc to limit the change only for `TableFunction`[1].
> >> >
> >> > For the more powerful `ProcessFunction`, I'd like to share some
> >> thoughts:
> >> > There indeed exists requirements for advanced usage in Table/SQL,
> even a
> >> > further UD-Operator, e.g., UD-Join for user controlled join logic
> which
> >> can
> >> > not simply expressed by SQL. This is an interesting topic, expect more
> >> > discussions on this.
> >> >
> >> >
> >> > [1]
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
> >> >
> >> > Best,
> >> > Lincoln Lee
> >> >
> >> >
> >> > Piotr Nowojski <pn...@apache.org> 于2022年9月15日周四 22:39写道:
> >> >
> >> > > Hi Dawid, Lincoln,
> >> > >
> >> > > I would tend to agree with Dawid. It seems to me like
> `TableFunction`
> >> is
> >> > > the one that needs to be taken care of. Other types of
> >> > > `UserDefinedFunction` wouldn't be able to emit anything from the
> >> `finish()`
> >> > > even if we added it. And if we added `finish(Collector<T> out)` to
> >> them, it
> >> > > would create the same problems (how to pass the output type) that
> >> prevented
> >> > > us from adding `finish()` to all functions in the DataStream API.
> >> > >
> >> > > However I'm not sure what should be the long term solution for the
> >> Table
> >> > > API. For the DataStream API we wanted to provide a new, better and
> >> more
> >> > > powerful `ProcessFunction` for all of the unusual use cases, that
> >> currently
> >> > > require the use of `StreamOperator` API instead of `DataStream`
> >> functions.
> >> > > I don't know what would be an alternative in the Table API.
> >> > >
> >> > > Dawid, who do you think we should ping from the Table API/SQL teams
> >> to chip
> >> > > in?
> >> > >
> >> > > Best,
> >> > > Piotrek
> >> > >
> >> > > czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz <dw...@apache.org>
> >> > > napisał(a):
> >> > >
> >> > > > Hey Lincoln,
> >> > > >
> >> > > > Thanks for opening the discussion.
> >> > > >
> >> > > > To be honest I am not convinced if emitting from close there is a
> >> > > > contract that was envisioned and thus should be maintained. As far
> >> as I
> >> > > > can see it does affect only the TableFunction, because it has the
> >> > > > collect method. None of the other UDFs (ScalarFunction,
> >> > > > AggregateFunction) have means to emit records from close().
> >> > > >
> >> > > > To be honest I am not sure what would be the consequences of
> >> interplay
> >> > > > with other operators which expect TableFunction to emit only when
> >> eval
> >> > > > is called. Not sure if there are such.
> >> > > >
> >> > > > If it is a thing that we are certain we want to support, I'd be
> much
> >> > > > more comfortable adding finish() to the TableFunction instead.
> >> Would be
> >> > > > happy to hear opinions from the Table API folks.
> >> > > >
> >> > > > Best,
> >> > > >
> >> > > > Dawid
> >> > > >
> >> > > > On 14/09/2022 15:55, Lincoln Lee wrote:
> >> > > > > Thanks @Piort for your valuable inputs!
> >> > > > >
> >> > > > > I did a quick read of the previous discussion you mentioned,
> >> seems my
> >> > > > flip
> >> > > > > title doesn't give a clear scope here and make some confusions,
> >> if my
> >> > > > > understanding is correct, the UDFs in your context is the user
> >> > > > > implemented `org.apache.flink.api.common.functions.Function`s,
> >> while
> >> > > the
> >> > > > > `UserDefinedFunction` I mentioned in the flip is limited to the
> >> > > > flink-table
> >> > > > > module which located in `org.apache.flink.table.functions`.
> >> > > > >
> >> > > > > Here's an use case we've met recently (which is indeed the
> >> motivation
> >> > > to
> >> > > > > propose this):
> >> > > > > one of our user implemented a
> >> > > > > `org.apache.flink.table.functions.TableFunction`, the simplified
> >> > > > > pseudo-code is as below:
> >> > > > >
> >> > > > > ```
> >> > > > > class XFunction extend TableFunction<Out> {
> >> > > > >
> >> > > > >    open(FunctionContext context){
> >> > > > >        initMemQueue();
> >> > > > >        initPythonProc()
> >> > > > >    }
> >> > > > >
> >> > > > >    eval(In in){
> >> > > > >        queue.offer(data)
> >> > > > >        Out out = queue.poll()
> >> > > > >        if (out != null) {
> >> > > > >          collect(out)
> >> > > > >        }
> >> > > > >    }
> >> > > > >
> >> > > > >    close(){
> >> > > > >        waitForPythonFinish()
> >> > > > >        List<Out> outputs = drainQueue()
> >> > > > >        outputs.foreach(out -> collect(out))
> >> > > > >    }
> >> > > > > }
> >> > > > > ```
> >> > > > > It works well in lower flink versions until they attempt to do a
> >> > > upgrade
> >> > > > > recently, the 'flush' logic in the legacy close method of
> >> > > `TableFunction`
> >> > > > > cannot work properly any more.
> >> > > > >
> >> > > > > Before proposing the flip, I also considered the `flush()`
> >> extension on
> >> > > > the
> >> > > > > `org.apache.flink.api.common.functions.Function`, because some
> sql
> >> > > > > operators are also related, but currently not included in the
> >> scope of
> >> > > > this
> >> > > > > flip, maybe we can discuss it in another thread.
> >> > > > >
> >> > > > > Wish this helps explaining the reason and welcome your comments
> >> here!
> >> > > > >
> >> > > > > Best,
> >> > > > > Lincoln Lee
> >> > > > >
> >> > > > >
> >> > > > > Piotr Nowojski <pn...@apache.org> 于2022年9月14日周三 16:56写道:
> >> > > > >
> >> > > > >> Hi Lincoln,
> >> > > > >>
> >> > > > >> Thanks for the proposal. Have you seen the old discussion about
> >> adding
> >> > > > this
> >> > > > >> `finish()` method? [1] We didn't add it to UDFs, as we didn't
> >> see a
> >> > > > >> motivation (maybe we have missed something), and at the same
> >> time it
> >> > > > wasn't
> >> > > > >> that easy. Plain `finish()` wouldn't be enough. Users would
> need
> >> a way
> >> > > > to
> >> > > > >> output records from the `finish()` call, so it would have to be
> >> typed
> >> > > > with
> >> > > > >> the user record (`finish(Collector<T> output)`). On the other
> >> hand, we
> >> > > > >> couldn't find an example where a user would actually need the
> >> > > `finish()`
> >> > > > >> call in an UDF, as it seemed to us it makes only sense for
> >> > > > >> operators/functions that are buffering records. Note back then,
> >> during
> >> > > > the
> >> > > > >> discussion, we were referring to this method as `flush()` or
> >> > > `drain()`.
> >> > > > >>
> >> > > > >> Can you shed some more light and provide more details on the
> >> exact
> >> > > > >> motivating example behind this proposal?
> >> > > > >>
> >> > > > >> Best,
> >> > > > >> Piotrek
> >> > > > >>
> >> > > > >> [1]
> >> https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp
> >> > > > >>
> >> > > > >> śr., 14 wrz 2022 o 08:22 Lincoln Lee <li...@gmail.com>
> >> > > > napisał(a):
> >> > > > >>
> >> > > > >>> Hello everyone,
> >> > > > >>>
> >> > > > >>>    I’d like to open a discussion on FLIP-260[1]: expose finish
> >> method
> >> > > > for
> >> > > > >>> UserDefinedFunction, this makes a chance for users who rely on
> >> finish
> >> > > > >> logic
> >> > > > >>> in the legacy close() method (< 1.14) to migrate to the new
> >> finish()
> >> > > > >>> method.
> >> > > > >>>
> >> > > > >>>    The task lifecycle was changed in FLINK-22972[2]: a new
> >> finish()
> >> > > > phase
> >> > > > >>> was introduced (extracted the ‘finish’ part out of the
> ‘close’)
> >> and
> >> > > > >> removed
> >> > > > >>> the dispose() method. This change was also done in table
> module
> >> > > (e.g.,
> >> > > > >>> `AbstractMapBundleOperator` for mini-batch operation ) but not
> >> > > covered
> >> > > > >> the
> >> > > > >>> UserDefinedFunction which only exposes open() and close() api
> >> for
> >> > > > custom
> >> > > > >>> usage, those customers who rely on the legacy close() api may
> >> > > encounter
> >> > > > >>> wrong result or suffer runtime errors after upgrading to the
> new
> >> > > > version.
> >> > > > >>> Strictly speaking, it is a bug caused by the breaking change,
> >> but due
> >> > > > to
> >> > > > >>> the public api change, we propose this flip.
> >> > > > >>>
> >> > > > >>>    Looking forward to your comments or feedback.
> >> > > > >>>
> >> > > > >>> [1]
> >> > > > >>>
> >> > > > >>>
> >> > > > >>
> >> > > >
> >> > >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
> >> > > > >>> [2] https://issues.apache.org/jira/browse/FLINK-22972
> >> > > > >>>
> >> > > > >>> Best,
> >> > > > >>> Lincoln Lee
> >> > > > >>>
> >> > > >
> >> > >
> >>
> >
>

Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction

Posted by Lincoln Lee <li...@gmail.com>.
Hi all,
   I'll start a vote if there are no more objections till this
thursday(9.22). Looking forward to your feedback!

[1] Flip-260:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
[2] PoC: https://github.com/lincoln-lil/flink/tree/tf-finish-poc

Best,
Lincoln Lee


Lincoln Lee <li...@gmail.com> 于2022年9月19日周一 17:38写道:

> Hi Jingsong,
>    Thank you for participating this discussion!  For the method name, I
> think we should follow the new finish() method in `StreamOperator`,  the
> BoundedOneInput might be removed in the future as discussed [1] before
>
> [1] https://lists.apache.org/thread/3ozw653ql8jso9w55p4pw8p4909trvkb
>
> Best,
> Lincoln Lee
>
>
> Jingsong Li <ji...@gmail.com> 于2022年9月19日周一 10:13写道:
>
>> +1 to add `finish()` method to `TableFunction` only.
>>
>> Can we use `endInput` just like `BoundedOneInput`?
>>
>> Best,
>> Jingsong
>>
>> On Fri, Sep 16, 2022 at 11:54 PM Lincoln Lee <li...@gmail.com>
>> wrote:
>> >
>> > Hi Dawid, Piotr,
>> >    Agree with you that add finish() method to `TableFunction` only.
>> Other
>> > `UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`,
>> > `AggregateTableFunction`) are not necessarily to have the finish
>> > method(they can not emit records in legacy close() method).
>> >
>> > A `TableFunction` is used to correlate with the left table/stream, the
>> > following example shows a case that user only select columns from the
>> > correlated 'FeatureTF' (no left table column was selected):
>> > ```
>> > SELECT feature1, feature2, feature3
>> > FROM MyTable t1
>> > JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2,
>> > feature3) ON TRUE
>> > ```
>> > the 'FeatureTF' can do some flushing work in legacy close() method and
>> this
>> > doesn't break any sql semantics, so I don't see any reason that we can
>> > enforce users not do flushing work in new finish() method. I've updated
>> the
>> > flip doc to limit the change only for `TableFunction`[1].
>> >
>> > For the more powerful `ProcessFunction`, I'd like to share some
>> thoughts:
>> > There indeed exists requirements for advanced usage in Table/SQL, even a
>> > further UD-Operator, e.g., UD-Join for user controlled join logic which
>> can
>> > not simply expressed by SQL. This is an interesting topic, expect more
>> > discussions on this.
>> >
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
>> >
>> > Best,
>> > Lincoln Lee
>> >
>> >
>> > Piotr Nowojski <pn...@apache.org> 于2022年9月15日周四 22:39写道:
>> >
>> > > Hi Dawid, Lincoln,
>> > >
>> > > I would tend to agree with Dawid. It seems to me like `TableFunction`
>> is
>> > > the one that needs to be taken care of. Other types of
>> > > `UserDefinedFunction` wouldn't be able to emit anything from the
>> `finish()`
>> > > even if we added it. And if we added `finish(Collector<T> out)` to
>> them, it
>> > > would create the same problems (how to pass the output type) that
>> prevented
>> > > us from adding `finish()` to all functions in the DataStream API.
>> > >
>> > > However I'm not sure what should be the long term solution for the
>> Table
>> > > API. For the DataStream API we wanted to provide a new, better and
>> more
>> > > powerful `ProcessFunction` for all of the unusual use cases, that
>> currently
>> > > require the use of `StreamOperator` API instead of `DataStream`
>> functions.
>> > > I don't know what would be an alternative in the Table API.
>> > >
>> > > Dawid, who do you think we should ping from the Table API/SQL teams
>> to chip
>> > > in?
>> > >
>> > > Best,
>> > > Piotrek
>> > >
>> > > czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz <dw...@apache.org>
>> > > napisał(a):
>> > >
>> > > > Hey Lincoln,
>> > > >
>> > > > Thanks for opening the discussion.
>> > > >
>> > > > To be honest I am not convinced if emitting from close there is a
>> > > > contract that was envisioned and thus should be maintained. As far
>> as I
>> > > > can see it does affect only the TableFunction, because it has the
>> > > > collect method. None of the other UDFs (ScalarFunction,
>> > > > AggregateFunction) have means to emit records from close().
>> > > >
>> > > > To be honest I am not sure what would be the consequences of
>> interplay
>> > > > with other operators which expect TableFunction to emit only when
>> eval
>> > > > is called. Not sure if there are such.
>> > > >
>> > > > If it is a thing that we are certain we want to support, I'd be much
>> > > > more comfortable adding finish() to the TableFunction instead.
>> Would be
>> > > > happy to hear opinions from the Table API folks.
>> > > >
>> > > > Best,
>> > > >
>> > > > Dawid
>> > > >
>> > > > On 14/09/2022 15:55, Lincoln Lee wrote:
>> > > > > Thanks @Piort for your valuable inputs!
>> > > > >
>> > > > > I did a quick read of the previous discussion you mentioned,
>> seems my
>> > > > flip
>> > > > > title doesn't give a clear scope here and make some confusions,
>> if my
>> > > > > understanding is correct, the UDFs in your context is the user
>> > > > > implemented `org.apache.flink.api.common.functions.Function`s,
>> while
>> > > the
>> > > > > `UserDefinedFunction` I mentioned in the flip is limited to the
>> > > > flink-table
>> > > > > module which located in `org.apache.flink.table.functions`.
>> > > > >
>> > > > > Here's an use case we've met recently (which is indeed the
>> motivation
>> > > to
>> > > > > propose this):
>> > > > > one of our user implemented a
>> > > > > `org.apache.flink.table.functions.TableFunction`, the simplified
>> > > > > pseudo-code is as below:
>> > > > >
>> > > > > ```
>> > > > > class XFunction extend TableFunction<Out> {
>> > > > >
>> > > > >    open(FunctionContext context){
>> > > > >        initMemQueue();
>> > > > >        initPythonProc()
>> > > > >    }
>> > > > >
>> > > > >    eval(In in){
>> > > > >        queue.offer(data)
>> > > > >        Out out = queue.poll()
>> > > > >        if (out != null) {
>> > > > >          collect(out)
>> > > > >        }
>> > > > >    }
>> > > > >
>> > > > >    close(){
>> > > > >        waitForPythonFinish()
>> > > > >        List<Out> outputs = drainQueue()
>> > > > >        outputs.foreach(out -> collect(out))
>> > > > >    }
>> > > > > }
>> > > > > ```
>> > > > > It works well in lower flink versions until they attempt to do a
>> > > upgrade
>> > > > > recently, the 'flush' logic in the legacy close method of
>> > > `TableFunction`
>> > > > > cannot work properly any more.
>> > > > >
>> > > > > Before proposing the flip, I also considered the `flush()`
>> extension on
>> > > > the
>> > > > > `org.apache.flink.api.common.functions.Function`, because some sql
>> > > > > operators are also related, but currently not included in the
>> scope of
>> > > > this
>> > > > > flip, maybe we can discuss it in another thread.
>> > > > >
>> > > > > Wish this helps explaining the reason and welcome your comments
>> here!
>> > > > >
>> > > > > Best,
>> > > > > Lincoln Lee
>> > > > >
>> > > > >
>> > > > > Piotr Nowojski <pn...@apache.org> 于2022年9月14日周三 16:56写道:
>> > > > >
>> > > > >> Hi Lincoln,
>> > > > >>
>> > > > >> Thanks for the proposal. Have you seen the old discussion about
>> adding
>> > > > this
>> > > > >> `finish()` method? [1] We didn't add it to UDFs, as we didn't
>> see a
>> > > > >> motivation (maybe we have missed something), and at the same
>> time it
>> > > > wasn't
>> > > > >> that easy. Plain `finish()` wouldn't be enough. Users would need
>> a way
>> > > > to
>> > > > >> output records from the `finish()` call, so it would have to be
>> typed
>> > > > with
>> > > > >> the user record (`finish(Collector<T> output)`). On the other
>> hand, we
>> > > > >> couldn't find an example where a user would actually need the
>> > > `finish()`
>> > > > >> call in an UDF, as it seemed to us it makes only sense for
>> > > > >> operators/functions that are buffering records. Note back then,
>> during
>> > > > the
>> > > > >> discussion, we were referring to this method as `flush()` or
>> > > `drain()`.
>> > > > >>
>> > > > >> Can you shed some more light and provide more details on the
>> exact
>> > > > >> motivating example behind this proposal?
>> > > > >>
>> > > > >> Best,
>> > > > >> Piotrek
>> > > > >>
>> > > > >> [1]
>> https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp
>> > > > >>
>> > > > >> śr., 14 wrz 2022 o 08:22 Lincoln Lee <li...@gmail.com>
>> > > > napisał(a):
>> > > > >>
>> > > > >>> Hello everyone,
>> > > > >>>
>> > > > >>>    I’d like to open a discussion on FLIP-260[1]: expose finish
>> method
>> > > > for
>> > > > >>> UserDefinedFunction, this makes a chance for users who rely on
>> finish
>> > > > >> logic
>> > > > >>> in the legacy close() method (< 1.14) to migrate to the new
>> finish()
>> > > > >>> method.
>> > > > >>>
>> > > > >>>    The task lifecycle was changed in FLINK-22972[2]: a new
>> finish()
>> > > > phase
>> > > > >>> was introduced (extracted the ‘finish’ part out of the ‘close’)
>> and
>> > > > >> removed
>> > > > >>> the dispose() method. This change was also done in table module
>> > > (e.g.,
>> > > > >>> `AbstractMapBundleOperator` for mini-batch operation ) but not
>> > > covered
>> > > > >> the
>> > > > >>> UserDefinedFunction which only exposes open() and close() api
>> for
>> > > > custom
>> > > > >>> usage, those customers who rely on the legacy close() api may
>> > > encounter
>> > > > >>> wrong result or suffer runtime errors after upgrading to the new
>> > > > version.
>> > > > >>> Strictly speaking, it is a bug caused by the breaking change,
>> but due
>> > > > to
>> > > > >>> the public api change, we propose this flip.
>> > > > >>>
>> > > > >>>    Looking forward to your comments or feedback.
>> > > > >>>
>> > > > >>> [1]
>> > > > >>>
>> > > > >>>
>> > > > >>
>> > > >
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
>> > > > >>> [2] https://issues.apache.org/jira/browse/FLINK-22972
>> > > > >>>
>> > > > >>> Best,
>> > > > >>> Lincoln Lee
>> > > > >>>
>> > > >
>> > >
>>
>

Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction

Posted by Lincoln Lee <li...@gmail.com>.
Hi Jingsong,
   Thank you for participating this discussion!  For the method name, I
think we should follow the new finish() method in `StreamOperator`,  the
BoundedOneInput might be removed in the future as discussed [1] before

[1] https://lists.apache.org/thread/3ozw653ql8jso9w55p4pw8p4909trvkb

Best,
Lincoln Lee


Jingsong Li <ji...@gmail.com> 于2022年9月19日周一 10:13写道:

> +1 to add `finish()` method to `TableFunction` only.
>
> Can we use `endInput` just like `BoundedOneInput`?
>
> Best,
> Jingsong
>
> On Fri, Sep 16, 2022 at 11:54 PM Lincoln Lee <li...@gmail.com>
> wrote:
> >
> > Hi Dawid, Piotr,
> >    Agree with you that add finish() method to `TableFunction` only. Other
> > `UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`,
> > `AggregateTableFunction`) are not necessarily to have the finish
> > method(they can not emit records in legacy close() method).
> >
> > A `TableFunction` is used to correlate with the left table/stream, the
> > following example shows a case that user only select columns from the
> > correlated 'FeatureTF' (no left table column was selected):
> > ```
> > SELECT feature1, feature2, feature3
> > FROM MyTable t1
> > JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2,
> > feature3) ON TRUE
> > ```
> > the 'FeatureTF' can do some flushing work in legacy close() method and
> this
> > doesn't break any sql semantics, so I don't see any reason that we can
> > enforce users not do flushing work in new finish() method. I've updated
> the
> > flip doc to limit the change only for `TableFunction`[1].
> >
> > For the more powerful `ProcessFunction`, I'd like to share some thoughts:
> > There indeed exists requirements for advanced usage in Table/SQL, even a
> > further UD-Operator, e.g., UD-Join for user controlled join logic which
> can
> > not simply expressed by SQL. This is an interesting topic, expect more
> > discussions on this.
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Piotr Nowojski <pn...@apache.org> 于2022年9月15日周四 22:39写道:
> >
> > > Hi Dawid, Lincoln,
> > >
> > > I would tend to agree with Dawid. It seems to me like `TableFunction`
> is
> > > the one that needs to be taken care of. Other types of
> > > `UserDefinedFunction` wouldn't be able to emit anything from the
> `finish()`
> > > even if we added it. And if we added `finish(Collector<T> out)` to
> them, it
> > > would create the same problems (how to pass the output type) that
> prevented
> > > us from adding `finish()` to all functions in the DataStream API.
> > >
> > > However I'm not sure what should be the long term solution for the
> Table
> > > API. For the DataStream API we wanted to provide a new, better and more
> > > powerful `ProcessFunction` for all of the unusual use cases, that
> currently
> > > require the use of `StreamOperator` API instead of `DataStream`
> functions.
> > > I don't know what would be an alternative in the Table API.
> > >
> > > Dawid, who do you think we should ping from the Table API/SQL teams to
> chip
> > > in?
> > >
> > > Best,
> > > Piotrek
> > >
> > > czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz <dw...@apache.org>
> > > napisał(a):
> > >
> > > > Hey Lincoln,
> > > >
> > > > Thanks for opening the discussion.
> > > >
> > > > To be honest I am not convinced if emitting from close there is a
> > > > contract that was envisioned and thus should be maintained. As far
> as I
> > > > can see it does affect only the TableFunction, because it has the
> > > > collect method. None of the other UDFs (ScalarFunction,
> > > > AggregateFunction) have means to emit records from close().
> > > >
> > > > To be honest I am not sure what would be the consequences of
> interplay
> > > > with other operators which expect TableFunction to emit only when
> eval
> > > > is called. Not sure if there are such.
> > > >
> > > > If it is a thing that we are certain we want to support, I'd be much
> > > > more comfortable adding finish() to the TableFunction instead. Would
> be
> > > > happy to hear opinions from the Table API folks.
> > > >
> > > > Best,
> > > >
> > > > Dawid
> > > >
> > > > On 14/09/2022 15:55, Lincoln Lee wrote:
> > > > > Thanks @Piort for your valuable inputs!
> > > > >
> > > > > I did a quick read of the previous discussion you mentioned, seems
> my
> > > > flip
> > > > > title doesn't give a clear scope here and make some confusions, if
> my
> > > > > understanding is correct, the UDFs in your context is the user
> > > > > implemented `org.apache.flink.api.common.functions.Function`s,
> while
> > > the
> > > > > `UserDefinedFunction` I mentioned in the flip is limited to the
> > > > flink-table
> > > > > module which located in `org.apache.flink.table.functions`.
> > > > >
> > > > > Here's an use case we've met recently (which is indeed the
> motivation
> > > to
> > > > > propose this):
> > > > > one of our user implemented a
> > > > > `org.apache.flink.table.functions.TableFunction`, the simplified
> > > > > pseudo-code is as below:
> > > > >
> > > > > ```
> > > > > class XFunction extend TableFunction<Out> {
> > > > >
> > > > >    open(FunctionContext context){
> > > > >        initMemQueue();
> > > > >        initPythonProc()
> > > > >    }
> > > > >
> > > > >    eval(In in){
> > > > >        queue.offer(data)
> > > > >        Out out = queue.poll()
> > > > >        if (out != null) {
> > > > >          collect(out)
> > > > >        }
> > > > >    }
> > > > >
> > > > >    close(){
> > > > >        waitForPythonFinish()
> > > > >        List<Out> outputs = drainQueue()
> > > > >        outputs.foreach(out -> collect(out))
> > > > >    }
> > > > > }
> > > > > ```
> > > > > It works well in lower flink versions until they attempt to do a
> > > upgrade
> > > > > recently, the 'flush' logic in the legacy close method of
> > > `TableFunction`
> > > > > cannot work properly any more.
> > > > >
> > > > > Before proposing the flip, I also considered the `flush()`
> extension on
> > > > the
> > > > > `org.apache.flink.api.common.functions.Function`, because some sql
> > > > > operators are also related, but currently not included in the
> scope of
> > > > this
> > > > > flip, maybe we can discuss it in another thread.
> > > > >
> > > > > Wish this helps explaining the reason and welcome your comments
> here!
> > > > >
> > > > > Best,
> > > > > Lincoln Lee
> > > > >
> > > > >
> > > > > Piotr Nowojski <pn...@apache.org> 于2022年9月14日周三 16:56写道:
> > > > >
> > > > >> Hi Lincoln,
> > > > >>
> > > > >> Thanks for the proposal. Have you seen the old discussion about
> adding
> > > > this
> > > > >> `finish()` method? [1] We didn't add it to UDFs, as we didn't see
> a
> > > > >> motivation (maybe we have missed something), and at the same time
> it
> > > > wasn't
> > > > >> that easy. Plain `finish()` wouldn't be enough. Users would need
> a way
> > > > to
> > > > >> output records from the `finish()` call, so it would have to be
> typed
> > > > with
> > > > >> the user record (`finish(Collector<T> output)`). On the other
> hand, we
> > > > >> couldn't find an example where a user would actually need the
> > > `finish()`
> > > > >> call in an UDF, as it seemed to us it makes only sense for
> > > > >> operators/functions that are buffering records. Note back then,
> during
> > > > the
> > > > >> discussion, we were referring to this method as `flush()` or
> > > `drain()`.
> > > > >>
> > > > >> Can you shed some more light and provide more details on the exact
> > > > >> motivating example behind this proposal?
> > > > >>
> > > > >> Best,
> > > > >> Piotrek
> > > > >>
> > > > >> [1]
> https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp
> > > > >>
> > > > >> śr., 14 wrz 2022 o 08:22 Lincoln Lee <li...@gmail.com>
> > > > napisał(a):
> > > > >>
> > > > >>> Hello everyone,
> > > > >>>
> > > > >>>    I’d like to open a discussion on FLIP-260[1]: expose finish
> method
> > > > for
> > > > >>> UserDefinedFunction, this makes a chance for users who rely on
> finish
> > > > >> logic
> > > > >>> in the legacy close() method (< 1.14) to migrate to the new
> finish()
> > > > >>> method.
> > > > >>>
> > > > >>>    The task lifecycle was changed in FLINK-22972[2]: a new
> finish()
> > > > phase
> > > > >>> was introduced (extracted the ‘finish’ part out of the ‘close’)
> and
> > > > >> removed
> > > > >>> the dispose() method. This change was also done in table module
> > > (e.g.,
> > > > >>> `AbstractMapBundleOperator` for mini-batch operation ) but not
> > > covered
> > > > >> the
> > > > >>> UserDefinedFunction which only exposes open() and close() api for
> > > > custom
> > > > >>> usage, those customers who rely on the legacy close() api may
> > > encounter
> > > > >>> wrong result or suffer runtime errors after upgrading to the new
> > > > version.
> > > > >>> Strictly speaking, it is a bug caused by the breaking change,
> but due
> > > > to
> > > > >>> the public api change, we propose this flip.
> > > > >>>
> > > > >>>    Looking forward to your comments or feedback.
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
> > > > >>> [2] https://issues.apache.org/jira/browse/FLINK-22972
> > > > >>>
> > > > >>> Best,
> > > > >>> Lincoln Lee
> > > > >>>
> > > >
> > >
>

Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction

Posted by Jingsong Li <ji...@gmail.com>.
+1 to add `finish()` method to `TableFunction` only.

Can we use `endInput` just like `BoundedOneInput`?

Best,
Jingsong

On Fri, Sep 16, 2022 at 11:54 PM Lincoln Lee <li...@gmail.com> wrote:
>
> Hi Dawid, Piotr,
>    Agree with you that add finish() method to `TableFunction` only. Other
> `UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`,
> `AggregateTableFunction`) are not necessarily to have the finish
> method(they can not emit records in legacy close() method).
>
> A `TableFunction` is used to correlate with the left table/stream, the
> following example shows a case that user only select columns from the
> correlated 'FeatureTF' (no left table column was selected):
> ```
> SELECT feature1, feature2, feature3
> FROM MyTable t1
> JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2,
> feature3) ON TRUE
> ```
> the 'FeatureTF' can do some flushing work in legacy close() method and this
> doesn't break any sql semantics, so I don't see any reason that we can
> enforce users not do flushing work in new finish() method. I've updated the
> flip doc to limit the change only for `TableFunction`[1].
>
> For the more powerful `ProcessFunction`, I'd like to share some thoughts:
> There indeed exists requirements for advanced usage in Table/SQL, even a
> further UD-Operator, e.g., UD-Join for user controlled join logic which can
> not simply expressed by SQL. This is an interesting topic, expect more
> discussions on this.
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction
>
> Best,
> Lincoln Lee
>
>
> Piotr Nowojski <pn...@apache.org> 于2022年9月15日周四 22:39写道:
>
> > Hi Dawid, Lincoln,
> >
> > I would tend to agree with Dawid. It seems to me like `TableFunction` is
> > the one that needs to be taken care of. Other types of
> > `UserDefinedFunction` wouldn't be able to emit anything from the `finish()`
> > even if we added it. And if we added `finish(Collector<T> out)` to them, it
> > would create the same problems (how to pass the output type) that prevented
> > us from adding `finish()` to all functions in the DataStream API.
> >
> > However I'm not sure what should be the long term solution for the Table
> > API. For the DataStream API we wanted to provide a new, better and more
> > powerful `ProcessFunction` for all of the unusual use cases, that currently
> > require the use of `StreamOperator` API instead of `DataStream` functions.
> > I don't know what would be an alternative in the Table API.
> >
> > Dawid, who do you think we should ping from the Table API/SQL teams to chip
> > in?
> >
> > Best,
> > Piotrek
> >
> > czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz <dw...@apache.org>
> > napisał(a):
> >
> > > Hey Lincoln,
> > >
> > > Thanks for opening the discussion.
> > >
> > > To be honest I am not convinced if emitting from close there is a
> > > contract that was envisioned and thus should be maintained. As far as I
> > > can see it does affect only the TableFunction, because it has the
> > > collect method. None of the other UDFs (ScalarFunction,
> > > AggregateFunction) have means to emit records from close().
> > >
> > > To be honest I am not sure what would be the consequences of interplay
> > > with other operators which expect TableFunction to emit only when eval
> > > is called. Not sure if there are such.
> > >
> > > If it is a thing that we are certain we want to support, I'd be much
> > > more comfortable adding finish() to the TableFunction instead. Would be
> > > happy to hear opinions from the Table API folks.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 14/09/2022 15:55, Lincoln Lee wrote:
> > > > Thanks @Piort for your valuable inputs!
> > > >
> > > > I did a quick read of the previous discussion you mentioned, seems my
> > > flip
> > > > title doesn't give a clear scope here and make some confusions, if my
> > > > understanding is correct, the UDFs in your context is the user
> > > > implemented `org.apache.flink.api.common.functions.Function`s, while
> > the
> > > > `UserDefinedFunction` I mentioned in the flip is limited to the
> > > flink-table
> > > > module which located in `org.apache.flink.table.functions`.
> > > >
> > > > Here's an use case we've met recently (which is indeed the motivation
> > to
> > > > propose this):
> > > > one of our user implemented a
> > > > `org.apache.flink.table.functions.TableFunction`, the simplified
> > > > pseudo-code is as below:
> > > >
> > > > ```
> > > > class XFunction extend TableFunction<Out> {
> > > >
> > > >    open(FunctionContext context){
> > > >        initMemQueue();
> > > >        initPythonProc()
> > > >    }
> > > >
> > > >    eval(In in){
> > > >        queue.offer(data)
> > > >        Out out = queue.poll()
> > > >        if (out != null) {
> > > >          collect(out)
> > > >        }
> > > >    }
> > > >
> > > >    close(){
> > > >        waitForPythonFinish()
> > > >        List<Out> outputs = drainQueue()
> > > >        outputs.foreach(out -> collect(out))
> > > >    }
> > > > }
> > > > ```
> > > > It works well in lower flink versions until they attempt to do a
> > upgrade
> > > > recently, the 'flush' logic in the legacy close method of
> > `TableFunction`
> > > > cannot work properly any more.
> > > >
> > > > Before proposing the flip, I also considered the `flush()` extension on
> > > the
> > > > `org.apache.flink.api.common.functions.Function`, because some sql
> > > > operators are also related, but currently not included in the scope of
> > > this
> > > > flip, maybe we can discuss it in another thread.
> > > >
> > > > Wish this helps explaining the reason and welcome your comments here!
> > > >
> > > > Best,
> > > > Lincoln Lee
> > > >
> > > >
> > > > Piotr Nowojski <pn...@apache.org> 于2022年9月14日周三 16:56写道:
> > > >
> > > >> Hi Lincoln,
> > > >>
> > > >> Thanks for the proposal. Have you seen the old discussion about adding
> > > this
> > > >> `finish()` method? [1] We didn't add it to UDFs, as we didn't see a
> > > >> motivation (maybe we have missed something), and at the same time it
> > > wasn't
> > > >> that easy. Plain `finish()` wouldn't be enough. Users would need a way
> > > to
> > > >> output records from the `finish()` call, so it would have to be typed
> > > with
> > > >> the user record (`finish(Collector<T> output)`). On the other hand, we
> > > >> couldn't find an example where a user would actually need the
> > `finish()`
> > > >> call in an UDF, as it seemed to us it makes only sense for
> > > >> operators/functions that are buffering records. Note back then, during
> > > the
> > > >> discussion, we were referring to this method as `flush()` or
> > `drain()`.
> > > >>
> > > >> Can you shed some more light and provide more details on the exact
> > > >> motivating example behind this proposal?
> > > >>
> > > >> Best,
> > > >> Piotrek
> > > >>
> > > >> [1] https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp
> > > >>
> > > >> śr., 14 wrz 2022 o 08:22 Lincoln Lee <li...@gmail.com>
> > > napisał(a):
> > > >>
> > > >>> Hello everyone,
> > > >>>
> > > >>>    I’d like to open a discussion on FLIP-260[1]: expose finish method
> > > for
> > > >>> UserDefinedFunction, this makes a chance for users who rely on finish
> > > >> logic
> > > >>> in the legacy close() method (< 1.14) to migrate to the new finish()
> > > >>> method.
> > > >>>
> > > >>>    The task lifecycle was changed in FLINK-22972[2]: a new finish()
> > > phase
> > > >>> was introduced (extracted the ‘finish’ part out of the ‘close’) and
> > > >> removed
> > > >>> the dispose() method. This change was also done in table module
> > (e.g.,
> > > >>> `AbstractMapBundleOperator` for mini-batch operation ) but not
> > covered
> > > >> the
> > > >>> UserDefinedFunction which only exposes open() and close() api for
> > > custom
> > > >>> usage, those customers who rely on the legacy close() api may
> > encounter
> > > >>> wrong result or suffer runtime errors after upgrading to the new
> > > version.
> > > >>> Strictly speaking, it is a bug caused by the breaking change, but due
> > > to
> > > >>> the public api change, we propose this flip.
> > > >>>
> > > >>>    Looking forward to your comments or feedback.
> > > >>>
> > > >>> [1]
> > > >>>
> > > >>>
> > > >>
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
> > > >>> [2] https://issues.apache.org/jira/browse/FLINK-22972
> > > >>>
> > > >>> Best,
> > > >>> Lincoln Lee
> > > >>>
> > >
> >

Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction

Posted by Lincoln Lee <li...@gmail.com>.
Hi Dawid, Piotr,
   Agree with you that add finish() method to `TableFunction` only. Other
`UserDefinedFunction`s (`ScalarFunction`, `AggregateFunction`,
`AggregateTableFunction`) are not necessarily to have the finish
method(they can not emit records in legacy close() method).

A `TableFunction` is used to correlate with the left table/stream, the
following example shows a case that user only select columns from the
correlated 'FeatureTF' (no left table column was selected):
```
SELECT feature1, feature2, feature3
FROM MyTable t1
JOIN LATERAL TABLE(FeatureTF(t1.f0, t1.f1)) AS F(feature1, feature2,
feature3) ON TRUE
```
the 'FeatureTF' can do some flushing work in legacy close() method and this
doesn't break any sql semantics, so I don't see any reason that we can
enforce users not do flushing work in new finish() method. I've updated the
flip doc to limit the change only for `TableFunction`[1].

For the more powerful `ProcessFunction`, I'd like to share some thoughts:
There indeed exists requirements for advanced usage in Table/SQL, even a
further UD-Operator, e.g., UD-Join for user controlled join logic which can
not simply expressed by SQL. This is an interesting topic, expect more
discussions on this.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+TableFunction

Best,
Lincoln Lee


Piotr Nowojski <pn...@apache.org> 于2022年9月15日周四 22:39写道:

> Hi Dawid, Lincoln,
>
> I would tend to agree with Dawid. It seems to me like `TableFunction` is
> the one that needs to be taken care of. Other types of
> `UserDefinedFunction` wouldn't be able to emit anything from the `finish()`
> even if we added it. And if we added `finish(Collector<T> out)` to them, it
> would create the same problems (how to pass the output type) that prevented
> us from adding `finish()` to all functions in the DataStream API.
>
> However I'm not sure what should be the long term solution for the Table
> API. For the DataStream API we wanted to provide a new, better and more
> powerful `ProcessFunction` for all of the unusual use cases, that currently
> require the use of `StreamOperator` API instead of `DataStream` functions.
> I don't know what would be an alternative in the Table API.
>
> Dawid, who do you think we should ping from the Table API/SQL teams to chip
> in?
>
> Best,
> Piotrek
>
> czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz <dw...@apache.org>
> napisał(a):
>
> > Hey Lincoln,
> >
> > Thanks for opening the discussion.
> >
> > To be honest I am not convinced if emitting from close there is a
> > contract that was envisioned and thus should be maintained. As far as I
> > can see it does affect only the TableFunction, because it has the
> > collect method. None of the other UDFs (ScalarFunction,
> > AggregateFunction) have means to emit records from close().
> >
> > To be honest I am not sure what would be the consequences of interplay
> > with other operators which expect TableFunction to emit only when eval
> > is called. Not sure if there are such.
> >
> > If it is a thing that we are certain we want to support, I'd be much
> > more comfortable adding finish() to the TableFunction instead. Would be
> > happy to hear opinions from the Table API folks.
> >
> > Best,
> >
> > Dawid
> >
> > On 14/09/2022 15:55, Lincoln Lee wrote:
> > > Thanks @Piort for your valuable inputs!
> > >
> > > I did a quick read of the previous discussion you mentioned, seems my
> > flip
> > > title doesn't give a clear scope here and make some confusions, if my
> > > understanding is correct, the UDFs in your context is the user
> > > implemented `org.apache.flink.api.common.functions.Function`s, while
> the
> > > `UserDefinedFunction` I mentioned in the flip is limited to the
> > flink-table
> > > module which located in `org.apache.flink.table.functions`.
> > >
> > > Here's an use case we've met recently (which is indeed the motivation
> to
> > > propose this):
> > > one of our user implemented a
> > > `org.apache.flink.table.functions.TableFunction`, the simplified
> > > pseudo-code is as below:
> > >
> > > ```
> > > class XFunction extend TableFunction<Out> {
> > >
> > >    open(FunctionContext context){
> > >        initMemQueue();
> > >        initPythonProc()
> > >    }
> > >
> > >    eval(In in){
> > >        queue.offer(data)
> > >        Out out = queue.poll()
> > >        if (out != null) {
> > >          collect(out)
> > >        }
> > >    }
> > >
> > >    close(){
> > >        waitForPythonFinish()
> > >        List<Out> outputs = drainQueue()
> > >        outputs.foreach(out -> collect(out))
> > >    }
> > > }
> > > ```
> > > It works well in lower flink versions until they attempt to do a
> upgrade
> > > recently, the 'flush' logic in the legacy close method of
> `TableFunction`
> > > cannot work properly any more.
> > >
> > > Before proposing the flip, I also considered the `flush()` extension on
> > the
> > > `org.apache.flink.api.common.functions.Function`, because some sql
> > > operators are also related, but currently not included in the scope of
> > this
> > > flip, maybe we can discuss it in another thread.
> > >
> > > Wish this helps explaining the reason and welcome your comments here!
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Piotr Nowojski <pn...@apache.org> 于2022年9月14日周三 16:56写道:
> > >
> > >> Hi Lincoln,
> > >>
> > >> Thanks for the proposal. Have you seen the old discussion about adding
> > this
> > >> `finish()` method? [1] We didn't add it to UDFs, as we didn't see a
> > >> motivation (maybe we have missed something), and at the same time it
> > wasn't
> > >> that easy. Plain `finish()` wouldn't be enough. Users would need a way
> > to
> > >> output records from the `finish()` call, so it would have to be typed
> > with
> > >> the user record (`finish(Collector<T> output)`). On the other hand, we
> > >> couldn't find an example where a user would actually need the
> `finish()`
> > >> call in an UDF, as it seemed to us it makes only sense for
> > >> operators/functions that are buffering records. Note back then, during
> > the
> > >> discussion, we were referring to this method as `flush()` or
> `drain()`.
> > >>
> > >> Can you shed some more light and provide more details on the exact
> > >> motivating example behind this proposal?
> > >>
> > >> Best,
> > >> Piotrek
> > >>
> > >> [1] https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp
> > >>
> > >> śr., 14 wrz 2022 o 08:22 Lincoln Lee <li...@gmail.com>
> > napisał(a):
> > >>
> > >>> Hello everyone,
> > >>>
> > >>>    I’d like to open a discussion on FLIP-260[1]: expose finish method
> > for
> > >>> UserDefinedFunction, this makes a chance for users who rely on finish
> > >> logic
> > >>> in the legacy close() method (< 1.14) to migrate to the new finish()
> > >>> method.
> > >>>
> > >>>    The task lifecycle was changed in FLINK-22972[2]: a new finish()
> > phase
> > >>> was introduced (extracted the ‘finish’ part out of the ‘close’) and
> > >> removed
> > >>> the dispose() method. This change was also done in table module
> (e.g.,
> > >>> `AbstractMapBundleOperator` for mini-batch operation ) but not
> covered
> > >> the
> > >>> UserDefinedFunction which only exposes open() and close() api for
> > custom
> > >>> usage, those customers who rely on the legacy close() api may
> encounter
> > >>> wrong result or suffer runtime errors after upgrading to the new
> > version.
> > >>> Strictly speaking, it is a bug caused by the breaking change, but due
> > to
> > >>> the public api change, we propose this flip.
> > >>>
> > >>>    Looking forward to your comments or feedback.
> > >>>
> > >>> [1]
> > >>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
> > >>> [2] https://issues.apache.org/jira/browse/FLINK-22972
> > >>>
> > >>> Best,
> > >>> Lincoln Lee
> > >>>
> >
>

Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Dawid, Lincoln,

I would tend to agree with Dawid. It seems to me like `TableFunction` is
the one that needs to be taken care of. Other types of
`UserDefinedFunction` wouldn't be able to emit anything from the `finish()`
even if we added it. And if we added `finish(Collector<T> out)` to them, it
would create the same problems (how to pass the output type) that prevented
us from adding `finish()` to all functions in the DataStream API.

However I'm not sure what should be the long term solution for the Table
API. For the DataStream API we wanted to provide a new, better and more
powerful `ProcessFunction` for all of the unusual use cases, that currently
require the use of `StreamOperator` API instead of `DataStream` functions.
I don't know what would be an alternative in the Table API.

Dawid, who do you think we should ping from the Table API/SQL teams to chip
in?

Best,
Piotrek

czw., 15 wrz 2022 o 12:38 Dawid Wysakowicz <dw...@apache.org>
napisał(a):

> Hey Lincoln,
>
> Thanks for opening the discussion.
>
> To be honest I am not convinced if emitting from close there is a
> contract that was envisioned and thus should be maintained. As far as I
> can see it does affect only the TableFunction, because it has the
> collect method. None of the other UDFs (ScalarFunction,
> AggregateFunction) have means to emit records from close().
>
> To be honest I am not sure what would be the consequences of interplay
> with other operators which expect TableFunction to emit only when eval
> is called. Not sure if there are such.
>
> If it is a thing that we are certain we want to support, I'd be much
> more comfortable adding finish() to the TableFunction instead. Would be
> happy to hear opinions from the Table API folks.
>
> Best,
>
> Dawid
>
> On 14/09/2022 15:55, Lincoln Lee wrote:
> > Thanks @Piort for your valuable inputs!
> >
> > I did a quick read of the previous discussion you mentioned, seems my
> flip
> > title doesn't give a clear scope here and make some confusions, if my
> > understanding is correct, the UDFs in your context is the user
> > implemented `org.apache.flink.api.common.functions.Function`s, while the
> > `UserDefinedFunction` I mentioned in the flip is limited to the
> flink-table
> > module which located in `org.apache.flink.table.functions`.
> >
> > Here's an use case we've met recently (which is indeed the motivation to
> > propose this):
> > one of our user implemented a
> > `org.apache.flink.table.functions.TableFunction`, the simplified
> > pseudo-code is as below:
> >
> > ```
> > class XFunction extend TableFunction<Out> {
> >
> >    open(FunctionContext context){
> >        initMemQueue();
> >        initPythonProc()
> >    }
> >
> >    eval(In in){
> >        queue.offer(data)
> >        Out out = queue.poll()
> >        if (out != null) {
> >          collect(out)
> >        }
> >    }
> >
> >    close(){
> >        waitForPythonFinish()
> >        List<Out> outputs = drainQueue()
> >        outputs.foreach(out -> collect(out))
> >    }
> > }
> > ```
> > It works well in lower flink versions until they attempt to do a upgrade
> > recently, the 'flush' logic in the legacy close method of `TableFunction`
> > cannot work properly any more.
> >
> > Before proposing the flip, I also considered the `flush()` extension on
> the
> > `org.apache.flink.api.common.functions.Function`, because some sql
> > operators are also related, but currently not included in the scope of
> this
> > flip, maybe we can discuss it in another thread.
> >
> > Wish this helps explaining the reason and welcome your comments here!
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Piotr Nowojski <pn...@apache.org> 于2022年9月14日周三 16:56写道:
> >
> >> Hi Lincoln,
> >>
> >> Thanks for the proposal. Have you seen the old discussion about adding
> this
> >> `finish()` method? [1] We didn't add it to UDFs, as we didn't see a
> >> motivation (maybe we have missed something), and at the same time it
> wasn't
> >> that easy. Plain `finish()` wouldn't be enough. Users would need a way
> to
> >> output records from the `finish()` call, so it would have to be typed
> with
> >> the user record (`finish(Collector<T> output)`). On the other hand, we
> >> couldn't find an example where a user would actually need the `finish()`
> >> call in an UDF, as it seemed to us it makes only sense for
> >> operators/functions that are buffering records. Note back then, during
> the
> >> discussion, we were referring to this method as `flush()` or `drain()`.
> >>
> >> Can you shed some more light and provide more details on the exact
> >> motivating example behind this proposal?
> >>
> >> Best,
> >> Piotrek
> >>
> >> [1] https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp
> >>
> >> śr., 14 wrz 2022 o 08:22 Lincoln Lee <li...@gmail.com>
> napisał(a):
> >>
> >>> Hello everyone,
> >>>
> >>>    I’d like to open a discussion on FLIP-260[1]: expose finish method
> for
> >>> UserDefinedFunction, this makes a chance for users who rely on finish
> >> logic
> >>> in the legacy close() method (< 1.14) to migrate to the new finish()
> >>> method.
> >>>
> >>>    The task lifecycle was changed in FLINK-22972[2]: a new finish()
> phase
> >>> was introduced (extracted the ‘finish’ part out of the ‘close’) and
> >> removed
> >>> the dispose() method. This change was also done in table module (e.g.,
> >>> `AbstractMapBundleOperator` for mini-batch operation ) but not covered
> >> the
> >>> UserDefinedFunction which only exposes open() and close() api for
> custom
> >>> usage, those customers who rely on the legacy close() api may encounter
> >>> wrong result or suffer runtime errors after upgrading to the new
> version.
> >>> Strictly speaking, it is a bug caused by the breaking change, but due
> to
> >>> the public api change, we propose this flip.
> >>>
> >>>    Looking forward to your comments or feedback.
> >>>
> >>> [1]
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
> >>> [2] https://issues.apache.org/jira/browse/FLINK-22972
> >>>
> >>> Best,
> >>> Lincoln Lee
> >>>
>

Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hey Lincoln,

Thanks for opening the discussion.

To be honest I am not convinced if emitting from close there is a 
contract that was envisioned and thus should be maintained. As far as I 
can see it does affect only the TableFunction, because it has the 
collect method. None of the other UDFs (ScalarFunction, 
AggregateFunction) have means to emit records from close().

To be honest I am not sure what would be the consequences of interplay 
with other operators which expect TableFunction to emit only when eval 
is called. Not sure if there are such.

If it is a thing that we are certain we want to support, I'd be much 
more comfortable adding finish() to the TableFunction instead. Would be 
happy to hear opinions from the Table API folks.

Best,

Dawid

On 14/09/2022 15:55, Lincoln Lee wrote:
> Thanks @Piort for your valuable inputs!
>
> I did a quick read of the previous discussion you mentioned, seems my flip
> title doesn't give a clear scope here and make some confusions, if my
> understanding is correct, the UDFs in your context is the user
> implemented `org.apache.flink.api.common.functions.Function`s, while the
> `UserDefinedFunction` I mentioned in the flip is limited to the flink-table
> module which located in `org.apache.flink.table.functions`.
>
> Here's an use case we've met recently (which is indeed the motivation to
> propose this):
> one of our user implemented a
> `org.apache.flink.table.functions.TableFunction`, the simplified
> pseudo-code is as below:
>
> ```
> class XFunction extend TableFunction<Out> {
>
>    open(FunctionContext context){
>        initMemQueue();
>        initPythonProc()
>    }
>
>    eval(In in){
>        queue.offer(data)
>        Out out = queue.poll()
>        if (out != null) {
>          collect(out)
>        }
>    }
>
>    close(){
>        waitForPythonFinish()
>        List<Out> outputs = drainQueue()
>        outputs.foreach(out -> collect(out))
>    }
> }
> ```
> It works well in lower flink versions until they attempt to do a upgrade
> recently, the 'flush' logic in the legacy close method of `TableFunction`
> cannot work properly any more.
>
> Before proposing the flip, I also considered the `flush()` extension on the
> `org.apache.flink.api.common.functions.Function`, because some sql
> operators are also related, but currently not included in the scope of this
> flip, maybe we can discuss it in another thread.
>
> Wish this helps explaining the reason and welcome your comments here!
>
> Best,
> Lincoln Lee
>
>
> Piotr Nowojski <pn...@apache.org> 于2022年9月14日周三 16:56写道:
>
>> Hi Lincoln,
>>
>> Thanks for the proposal. Have you seen the old discussion about adding this
>> `finish()` method? [1] We didn't add it to UDFs, as we didn't see a
>> motivation (maybe we have missed something), and at the same time it wasn't
>> that easy. Plain `finish()` wouldn't be enough. Users would need a way to
>> output records from the `finish()` call, so it would have to be typed with
>> the user record (`finish(Collector<T> output)`). On the other hand, we
>> couldn't find an example where a user would actually need the `finish()`
>> call in an UDF, as it seemed to us it makes only sense for
>> operators/functions that are buffering records. Note back then, during the
>> discussion, we were referring to this method as `flush()` or `drain()`.
>>
>> Can you shed some more light and provide more details on the exact
>> motivating example behind this proposal?
>>
>> Best,
>> Piotrek
>>
>> [1] https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp
>>
>> śr., 14 wrz 2022 o 08:22 Lincoln Lee <li...@gmail.com> napisał(a):
>>
>>> Hello everyone,
>>>
>>>    I’d like to open a discussion on FLIP-260[1]: expose finish method for
>>> UserDefinedFunction, this makes a chance for users who rely on finish
>> logic
>>> in the legacy close() method (< 1.14) to migrate to the new finish()
>>> method.
>>>
>>>    The task lifecycle was changed in FLINK-22972[2]: a new finish() phase
>>> was introduced (extracted the ‘finish’ part out of the ‘close’) and
>> removed
>>> the dispose() method. This change was also done in table module (e.g.,
>>> `AbstractMapBundleOperator` for mini-batch operation ) but not covered
>> the
>>> UserDefinedFunction which only exposes open() and close() api for custom
>>> usage, those customers who rely on the legacy close() api may encounter
>>> wrong result or suffer runtime errors after upgrading to the new version.
>>> Strictly speaking, it is a bug caused by the breaking change, but due to
>>> the public api change, we propose this flip.
>>>
>>>    Looking forward to your comments or feedback.
>>>
>>> [1]
>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
>>> [2] https://issues.apache.org/jira/browse/FLINK-22972
>>>
>>> Best,
>>> Lincoln Lee
>>>

Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction

Posted by Lincoln Lee <li...@gmail.com>.
Thanks @Piort for your valuable inputs!

I did a quick read of the previous discussion you mentioned, seems my flip
title doesn't give a clear scope here and make some confusions, if my
understanding is correct, the UDFs in your context is the user
implemented `org.apache.flink.api.common.functions.Function`s, while the
`UserDefinedFunction` I mentioned in the flip is limited to the flink-table
module which located in `org.apache.flink.table.functions`.

Here's an use case we've met recently (which is indeed the motivation to
propose this):
one of our user implemented a
`org.apache.flink.table.functions.TableFunction`, the simplified
pseudo-code is as below:

```
class XFunction extend TableFunction<Out> {

  open(FunctionContext context){
      initMemQueue();
      initPythonProc()
  }

  eval(In in){
      queue.offer(data)
      Out out = queue.poll()
      if (out != null) {
        collect(out)
      }
  }

  close(){
      waitForPythonFinish()
      List<Out> outputs = drainQueue()
      outputs.foreach(out -> collect(out))
  }
}
```
It works well in lower flink versions until they attempt to do a upgrade
recently, the 'flush' logic in the legacy close method of `TableFunction`
cannot work properly any more.

Before proposing the flip, I also considered the `flush()` extension on the
`org.apache.flink.api.common.functions.Function`, because some sql
operators are also related, but currently not included in the scope of this
flip, maybe we can discuss it in another thread.

Wish this helps explaining the reason and welcome your comments here!

Best,
Lincoln Lee


Piotr Nowojski <pn...@apache.org> 于2022年9月14日周三 16:56写道:

> Hi Lincoln,
>
> Thanks for the proposal. Have you seen the old discussion about adding this
> `finish()` method? [1] We didn't add it to UDFs, as we didn't see a
> motivation (maybe we have missed something), and at the same time it wasn't
> that easy. Plain `finish()` wouldn't be enough. Users would need a way to
> output records from the `finish()` call, so it would have to be typed with
> the user record (`finish(Collector<T> output)`). On the other hand, we
> couldn't find an example where a user would actually need the `finish()`
> call in an UDF, as it seemed to us it makes only sense for
> operators/functions that are buffering records. Note back then, during the
> discussion, we were referring to this method as `flush()` or `drain()`.
>
> Can you shed some more light and provide more details on the exact
> motivating example behind this proposal?
>
> Best,
> Piotrek
>
> [1] https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp
>
> śr., 14 wrz 2022 o 08:22 Lincoln Lee <li...@gmail.com> napisał(a):
>
> > Hello everyone,
> >
> >   I’d like to open a discussion on FLIP-260[1]: expose finish method for
> > UserDefinedFunction, this makes a chance for users who rely on finish
> logic
> > in the legacy close() method (< 1.14) to migrate to the new finish()
> > method.
> >
> >   The task lifecycle was changed in FLINK-22972[2]: a new finish() phase
> > was introduced (extracted the ‘finish’ part out of the ‘close’) and
> removed
> > the dispose() method. This change was also done in table module (e.g.,
> > `AbstractMapBundleOperator` for mini-batch operation ) but not covered
> the
> > UserDefinedFunction which only exposes open() and close() api for custom
> > usage, those customers who rely on the legacy close() api may encounter
> > wrong result or suffer runtime errors after upgrading to the new version.
> > Strictly speaking, it is a bug caused by the breaking change, but due to
> > the public api change, we propose this flip.
> >
> >   Looking forward to your comments or feedback.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
> > [2] https://issues.apache.org/jira/browse/FLINK-22972
> >
> > Best,
> > Lincoln Lee
> >
>

Re: [DISCUSS] FLIP-260: Expose Finish Method For UserDefinedFunction

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Lincoln,

Thanks for the proposal. Have you seen the old discussion about adding this
`finish()` method? [1] We didn't add it to UDFs, as we didn't see a
motivation (maybe we have missed something), and at the same time it wasn't
that easy. Plain `finish()` wouldn't be enough. Users would need a way to
output records from the `finish()` call, so it would have to be typed with
the user record (`finish(Collector<T> output)`). On the other hand, we
couldn't find an example where a user would actually need the `finish()`
call in an UDF, as it seemed to us it makes only sense for
operators/functions that are buffering records. Note back then, during the
discussion, we were referring to this method as `flush()` or `drain()`.

Can you shed some more light and provide more details on the exact
motivating example behind this proposal?

Best,
Piotrek

[1] https://lists.apache.org/thread/gmr9r3n3ktojt4bhoxz4t8qho6h7d1rp

śr., 14 wrz 2022 o 08:22 Lincoln Lee <li...@gmail.com> napisał(a):

> Hello everyone,
>
>   I’d like to open a discussion on FLIP-260[1]: expose finish method for
> UserDefinedFunction, this makes a chance for users who rely on finish logic
> in the legacy close() method (< 1.14) to migrate to the new finish()
> method.
>
>   The task lifecycle was changed in FLINK-22972[2]: a new finish() phase
> was introduced (extracted the ‘finish’ part out of the ‘close’) and removed
> the dispose() method. This change was also done in table module (e.g.,
> `AbstractMapBundleOperator` for mini-batch operation ) but not covered the
> UserDefinedFunction which only exposes open() and close() api for custom
> usage, those customers who rely on the legacy close() api may encounter
> wrong result or suffer runtime errors after upgrading to the new version.
> Strictly speaking, it is a bug caused by the breaking change, but due to
> the public api change, we propose this flip.
>
>   Looking forward to your comments or feedback.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-260%3A+Expose+Finish+Method+For+UserDefinedFunction
> [2] https://issues.apache.org/jira/browse/FLINK-22972
>
> Best,
> Lincoln Lee
>