You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Lorenzo Pirazzini <lo...@agilelab.it> on 2020/10/08 09:32:14 UTC

Wrapping a Flink Function

Hello, I'm having trouble finding a way to add logic to an existing SinkFunction.
What I would like to do is wrap an input SinkFunction inside another one that will perform its logic and then perform some additional logic, e.g.:

SinkFunction<TOUT> wrapFunction(SinkFunction<TOUT> function) {

  return new SinkFunction<TOUT>() {
    @Override
    public void invoke(TOUT value, Context context) throws Exception {
      function.invoke(value, context);
      //additional logic
    }
  };
}

This is not clean since if the input function is actually a RichSinkFunction (or another extension that adds other functionalities) I'll lose that functionalities because my wrapper is not exposing them (if I take a RichSinkFunction as parameter I lose the open() and close() methods since my wrapper is not exposing them).
One thing that I could do is to define different wrappers, one for each type extending SinkFunction, which will then expose all the methods implemented invoking the underlying wrapped function relative method, but I don't see this as a good solution.
Is there a way to add my custom logic to a function keeping all its features?

Thanks in advance


[cid:image003.jpg@01D69D66.AA3E6850]
Lorenzo Pirazzini
Big Data Engineer
E-mail: lorenzo.pirazzini@agilelab.it<ma...@agilelab.it>
Web Site: www.agilelab.it<http://www.agilelab.it>





Re: Wrapping a Flink Function

Posted by Aljoscha Krettek <al...@apache.org>.
Could you maybe outline how you want to extend the wrapped sink 
functionality? A better approach might be to add an operation "in front" 
of the sink.

Best,
Aljoscha

On 08.10.20 11:32, Lorenzo Pirazzini wrote:
> Hello, I'm having trouble finding a way to add logic to an existing SinkFunction.
> What I would like to do is wrap an input SinkFunction inside another one that will perform its logic and then perform some additional logic, e.g.:
> 
> SinkFunction<TOUT> wrapFunction(SinkFunction<TOUT> function) {
> 
>    return new SinkFunction<TOUT>() {
>      @Override
>      public void invoke(TOUT value, Context context) throws Exception {
>        function.invoke(value, context);
>        //additional logic
>      }
>    };
> }
> 
> This is not clean since if the input function is actually a RichSinkFunction (or another extension that adds other functionalities) I'll lose that functionalities because my wrapper is not exposing them (if I take a RichSinkFunction as parameter I lose the open() and close() methods since my wrapper is not exposing them).
> One thing that I could do is to define different wrappers, one for each type extending SinkFunction, which will then expose all the methods implemented invoking the underlying wrapped function relative method, but I don't see this as a good solution.
> Is there a way to add my custom logic to a function keeping all its features?
> 
> Thanks in advance
> 
> 
> [cid:image003.jpg@01D69D66.AA3E6850]
> Lorenzo Pirazzini
> Big Data Engineer
> E-mail: lorenzo.pirazzini@agilelab.it<ma...@agilelab.it>
> Web Site: www.agilelab.it<http://www.agilelab.it>
> 
> 
> 
> 
> 


Re: Wrapping a Flink Function

Posted by Teng Fei Liao <te...@gmail.com>.
I think any solution here is inherently fragile since future versions of
flink can have different abstract classes or interfaces you won't know it
has to support. But for a given release, something you can consider is a
wrapper class that extends/implements the ones you support. Then, during
the method invocation, check delegate instanceof ClassWhereMethodComesFrom
and no-op if this is false.

>