You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Sharipov, Rinat" <r....@cleverdata.ru> on 2020/10/14 19:19:01 UTC

PyFlink :: Bootstrap UDF function

Hi mates !

I keep moving in my research of new features of PyFlink and I'm really
excited about that functionality.
My main goal is to understand how to integrate our ML registry, powered by
ML Flow and PyFlink jobs and what restrictions we have.

I need to bootstrap the UDF function on it's startup when it's instantiated
in the Apache Beam process, but I don't know how it's called by PyFlink in
single thread fashion or shared among multiple threads. In other words, I
want to know, should I care about synchronization of my bootstrap logic or
not.

Here is a code example of my UDF function:













*class MyFunction(ScalarFunction):    def __init__(self):
self.initialized = False    def __bootstrap(self):        return
"bootstrapped"    def eval(self, urls):        if self.initialized:
        self.__bootstrap()        return "my-result"my_function =
udf(MyFunction(), [DataTypes.ARRAY(DataTypes.STRING())],
DataTypes.STRING())*


Thx a lot for your help !

Re: PyFlink :: Bootstrap UDF function

Posted by "Sharipov, Rinat" <r....@cleverdata.ru>.
Hi Dian !

Thx a lot for your reply, it's very helpful for us.



чт, 15 окт. 2020 г. в 04:30, Dian Fu <di...@gmail.com>:

> Hi Rinat,
>
> It's called in single thread fashion and so there is no need for the
> synchronization.
>
> Besides, there is a pair of open/close methods in the ScalarFunction and
> you could also override them and perform the initialization work in the
> open method.
>
> Regards,
> Dian
>
> 在 2020年10月15日,上午3:19,Sharipov, Rinat <r....@cleverdata.ru> 写道:
>
> Hi mates !
>
> I keep moving in my research of new features of PyFlink and I'm really
> excited about that functionality.
> My main goal is to understand how to integrate our ML registry, powered by
> ML Flow and PyFlink jobs and what restrictions we have.
>
> I need to bootstrap the UDF function on it's startup when it's
> instantiated in the Apache Beam process, but I don't know how it's called
> by PyFlink in single thread fashion or shared among multiple threads. In
> other words, I want to know, should I care about synchronization of my
> bootstrap logic or not.
>
> Here is a code example of my UDF function:
>
>
>
>
>
>
>
>
>
>
>
>
>
> *class MyFunction(ScalarFunction):    def __init__(self):        self.initialized = False    def __bootstrap(self):        return "bootstrapped"    def eval(self, urls):        if self.initialized:            self.__bootstrap()        return "my-result"my_function = udf(MyFunction(), [DataTypes.ARRAY(DataTypes.STRING())], DataTypes.STRING())*
>
>
> Thx a lot for your help !
>
>
>

Re: PyFlink :: Bootstrap UDF function

Posted by Dian Fu <di...@gmail.com>.
Hi Rinat,

It's called in single thread fashion and so there is no need for the synchronization.

Besides, there is a pair of open/close methods in the ScalarFunction and you could also override them and perform the initialization work in the open method.

Regards,
Dian

> 在 2020年10月15日,上午3:19,Sharipov, Rinat <r....@cleverdata.ru> 写道:
> 
> Hi mates !
> 
> I keep moving in my research of new features of PyFlink and I'm really excited about that functionality.
> My main goal is to understand how to integrate our ML registry, powered by ML Flow and PyFlink jobs and what restrictions we have.
> 
> I need to bootstrap the UDF function on it's startup when it's instantiated in the Apache Beam process, but I don't know how it's called by PyFlink in single thread fashion or shared among multiple threads. In other words, I want to know, should I care about synchronization of my bootstrap logic or not.
> 
> Here is a code example of my UDF function:
> class MyFunction(ScalarFunction):
>     def __init__(self):
>         self.initialized = False
> 
>     def __bootstrap(self):
>         return "bootstrapped"
> 
>     def eval(self, urls):
>         if self.initialized:
>             self.__bootstrap()
>         return "my-result"
> 
> my_function = udf(MyFunction(), [DataTypes.ARRAY(DataTypes.STRING())], DataTypes.STRING())
> 
> Thx a lot for your help !