You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Lorenzo Pirazzini <lo...@agilelab.it> on 2020/10/08 09:32:14 UTC
Wrapping a Flink Function
Hello, I'm having trouble finding a way to add logic to an existing SinkFunction.
What I would like to do is wrap an input SinkFunction inside another one that will perform its logic and then perform some additional logic, e.g.:
SinkFunction<TOUT> wrapFunction(SinkFunction<TOUT> function) {
return new SinkFunction<TOUT>() {
@Override
public void invoke(TOUT value, Context context) throws Exception {
function.invoke(value, context);
//additional logic
}
};
}
This is not clean since if the input function is actually a RichSinkFunction (or another extension that adds other functionalities) I'll lose that functionalities because my wrapper is not exposing them (if I take a RichSinkFunction as parameter I lose the open() and close() methods since my wrapper is not exposing them).
One thing that I could do is to define different wrappers, one for each type extending SinkFunction, which will then expose all the methods implemented invoking the underlying wrapped function relative method, but I don't see this as a good solution.
Is there a way to add my custom logic to a function keeping all its features?
Thanks in advance
[cid:image003.jpg@01D69D66.AA3E6850]
Lorenzo Pirazzini
Big Data Engineer
E-mail: lorenzo.pirazzini@agilelab.it<ma...@agilelab.it>
Web Site: www.agilelab.it<http://www.agilelab.it>
Re: Wrapping a Flink Function
Posted by Aljoscha Krettek <al...@apache.org>.
Could you maybe outline how you want to extend the wrapped sink
functionality? A better approach might be to add an operation "in front"
of the sink.
Best,
Aljoscha
On 08.10.20 11:32, Lorenzo Pirazzini wrote:
> Hello, I'm having trouble finding a way to add logic to an existing SinkFunction.
> What I would like to do is wrap an input SinkFunction inside another one that will perform its logic and then perform some additional logic, e.g.:
>
> SinkFunction<TOUT> wrapFunction(SinkFunction<TOUT> function) {
>
> return new SinkFunction<TOUT>() {
> @Override
> public void invoke(TOUT value, Context context) throws Exception {
> function.invoke(value, context);
> //additional logic
> }
> };
> }
>
> This is not clean since if the input function is actually a RichSinkFunction (or another extension that adds other functionalities) I'll lose that functionalities because my wrapper is not exposing them (if I take a RichSinkFunction as parameter I lose the open() and close() methods since my wrapper is not exposing them).
> One thing that I could do is to define different wrappers, one for each type extending SinkFunction, which will then expose all the methods implemented invoking the underlying wrapped function relative method, but I don't see this as a good solution.
> Is there a way to add my custom logic to a function keeping all its features?
>
> Thanks in advance
>
>
> [cid:image003.jpg@01D69D66.AA3E6850]
> Lorenzo Pirazzini
> Big Data Engineer
> E-mail: lorenzo.pirazzini@agilelab.it<ma...@agilelab.it>
> Web Site: www.agilelab.it<http://www.agilelab.it>
>
>
>
>
>
Re: Wrapping a Flink Function
Posted by Teng Fei Liao <te...@gmail.com>.
I think any solution here is inherently fragile since future versions of
flink can have different abstract classes or interfaces you won't know it
has to support. But for a given release, something you can consider is a
wrapper class that extends/implements the ones you support. Then, during
the method invocation, check delegate instanceof ClassWhereMethodComesFrom
and no-op if this is false.
>