You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Pan Glust <pa...@gmail.com> on 2018/03/16 11:35:01 UTC

Basic question about flink programms

 Hello everyone,

coming from the Spring/CDI world, I'm new to Flink and to streaming
processing in general and apologizing for the very basic questions. I wrote
simple Flink job with all functions inlined in the main method. The main
method has some static instance variables like HTTP client and guava cache
to avoid same requests to external APIs.

Everything works fine in IDE, however after refactoring functions to
separate classes I got NotSerializableException, because obviously one
cannot inject anything into functions, unless it is serializable and guava
cache isn't.

Am I right assuming every object needed for a function must be created by
function itself?
What is the right way to instantiate the objects (constructor or in open
method of the function)?
Can functions have non-serializable fields at all?

What are the best practices to manage any dependencies in Flink programs?

Maybe you can also point me to some tutorial for the beginners?

Kind regards,
Pan

Re: Basic question about flink programms

Posted by KristoffSC <kr...@gmail.com>.
Hi Arvid Heise-3,
Thanks for your answer. I took this approach. 


I did not want to start a new thread since I wanted to avoid "subject
duplication" :)

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Basic question about flink programms

Posted by Arvid Heise <ar...@ververica.com>.
Hi KristoffSC,

it would be better if you'd open up a new thread. It's very rare for users
to check user lists after 1 year on a regular basis.

In general, if you have a cache, you usually don't want to serialize it. So
add the cache as a field inside the respective function (rewrite a lambda
to an anonymous class) and make the field transient to avoid serialization.
Be aware that you usually want to initialize the cache in your open()
function (so you need to use a RichXFunction).

Best,

Arvid

On Fri, Dec 6, 2019 at 1:23 PM KristoffSC <kr...@gmail.com>
wrote:

> Hi,
> Im having the same problem now.  What is your approach now after gaining
> some experience?
>
> Also do you use Spring DI to setup/initialize your jobs/process functions?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Basic question about flink programms

Posted by KristoffSC <kr...@gmail.com>.
Hi,
Im having the same problem now.  What is your approach now after gaining
some experience? 

Also do you use Spring DI to setup/initialize your jobs/process functions?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Basic question about flink programms

Posted by dy...@salecycle.com, dy...@salecycle.com.
In general I'd expect that every class with state that you use in Flink will be serialised, and therefore you should be marking your classes as Serializable and set a serialVersionUID

I have what sounds like a very similar problem to yours. I need to use a non-serializable component in my streaming Flink job, actually it is also Guava cache. I've only got one area of the application that requires this cache, ie the cache is only being called into by one class.

In the serializable class that requires access to the cache, I've handled it like I would any non-serializable property. The property that references the cache container class is marked as transient, and a hook into the deserialization process (https://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html) is added and the cache container is initialised here on deserialization

On 2018/03/16 11:35:01, Pan Glust <pa...@gmail.com> wrote: 
>  Hello everyone,
> 
> coming from the Spring/CDI world, I'm new to Flink and to streaming
> processing in general and apologizing for the very basic questions. I wrote
> simple Flink job with all functions inlined in the main method. The main
> method has some static instance variables like HTTP client and guava cache
> to avoid same requests to external APIs.
> 
> Everything works fine in IDE, however after refactoring functions to
> separate classes I got NotSerializableException, because obviously one
> cannot inject anything into functions, unless it is serializable and guava
> cache isn't.
> 
> Am I right assuming every object needed for a function must be created by
> function itself?
> What is the right way to instantiate the objects (constructor or in open
> method of the function)?
> Can functions have non-serializable fields at all?
> 
> What are the best practices to manage any dependencies in Flink programs?
> 
> Maybe you can also point me to some tutorial for the beginners?
> 
> Kind regards,
> Pan
>