You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Aggarwal, Ajay" <Aj...@netapp.com> on 2019/02/07 19:37:38 UTC

stream of large objects

In my use case my source stream contain small size messages, but as part of flink processing I will be aggregating them into large messages and further processing will happen on these large messages. The structure of this large message will be something like this:

   Class LargeMessage {
        String key
       List <String> messages; // this is where the aggregation of smaller messages happen
   }

In some cases this list field of LargeMessage can get very large (1000’s of messages). Is it ok to create an intermediate stream of these LargeMessages? What should I be concerned about while designing the flink job? Specifically with parallelism in mind. As these LargeMessages flow from one flink subtask to another, do they get serialized/deserialized ?

Thanks.


Re: stream of large objects

Posted by "Aggarwal, Ajay" <Aj...@netapp.com>.
Thanks Konstantin. And when serialization of events does become an issue because of size (say 100s MBs or GBs) how does it manifest itself? Is it mostly latency or something else?

Ajay

From: Konstantin Knauf <ko...@ververica.com>
Date: Tuesday, February 12, 2019 at 3:41 AM
To: "Aggarwal, Ajay" <Aj...@netapp.com>
Cc: Chesnay Schepler <ch...@apache.org>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: stream of large objects

Hi Ajay,

when repartitioning the stream the events need to transferred between Taskmanagers (processes/nodes). Just passing a reference there won't work.

If it is serialization you are worried about and you don't need access to the List of messages inside the job, you might as well store this list of String as a byte[]. This will make serialization cheaper. Generally, events of a few MBs should not be a problem by itself.

Cheers,

Konstantin

On Mon, Feb 11, 2019 at 6:39 PM Aggarwal, Ajay <Aj...@netapp.com>> wrote:
I looked a little into broadcast state and while its interesting I don’t think it will help me. Since broadcast state is kept all in-memory, I am worried about memory requirement if I make all these LargeMessages part of broadcast state. Furthermore these LargeMessages need to be processed in a Keyed context, so sharing all of these across all downstream tasks does not seem efficient.


From: Chesnay Schepler <ch...@apache.org>>
Date: Sunday, February 10, 2019 at 4:57 AM
To: "Aggarwal, Ajay" <Aj...@netapp.com>>, "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: stream of large objects

NetApp Security WARNING: This is an external email. Do not click links or open attachments unless you recognize the sender and know the content is safe.


The Broadcast State<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/broadcast_state.html#the-broadcast-state-pattern> may be interesting to you.

On 08.02.2019 15:57, Aggarwal, Ajay wrote:
Yes, another KeyBy will be used. The “small size” messages will be strings of length 500 to 1000.

Is there a concept of “global” state in flink? Is it possible to keep these lists in global state and only pass the list reference (by name?) in the LargeMessage?


From: Chesnay Schepler <ch...@apache.org>
Date: Friday, February 8, 2019 at 8:45 AM
To: "Aggarwal, Ajay" <Aj...@netapp.com>, "user@flink.apache.org"<ma...@flink.apache.org> <us...@flink.apache.org>
Subject: Re: stream of large objects

Whether a LargeMessage is serialized depends on how the job is structured.
For example, if you were to only apply map/filter functions after the aggregation it is likely they wouldn't be serialized.
If you were to apply another keyBy they will be serialized again.

When you say "small size" messages, what are we talking about here?

On 07.02.2019 20:37, Aggarwal, Ajay wrote:
In my use case my source stream contain small size messages, but as part of flink processing I will be aggregating them into large messages and further processing will happen on these large messages. The structure of this large message will be something like this:

   Class LargeMessage {
        String key
       List <String> messages; // this is where the aggregation of smaller messages happen
   }

In some cases this list field of LargeMessage can get very large (1000’s of messages). Is it ok to create an intermediate stream of these LargeMessages? What should I be concerned about while designing the flink job? Specifically with parallelism in mind. As these LargeMessages flow from one flink subtask to another, do they get serialized/deserialized ?

Thanks.







--

Konstantin Knauf | Solutions Architect

+49 160 91394525


[Image removed by sender.]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Re: stream of large objects

Posted by Konstantin Knauf <ko...@ververica.com>.
Hi Ajay,

when repartitioning the stream the events need to transferred between
Taskmanagers (processes/nodes). Just passing a reference there won't work.

If it is serialization you are worried about and you don't need access to
the List of messages inside the job, you might as well store this list of
String as a byte[]. This will make serialization cheaper. Generally, events
of a few MBs should not be a problem by itself.

Cheers,

Konstantin

On Mon, Feb 11, 2019 at 6:39 PM Aggarwal, Ajay <Aj...@netapp.com>
wrote:

> I looked a little into broadcast state and while its interesting I don’t
> think it will help me. Since broadcast state is kept all in-memory, I am
> worried about memory requirement if I make all these LargeMessages part of
> broadcast state. Furthermore these LargeMessages need to be processed in a
> Keyed context, so sharing all of these across all downstream tasks does not
> seem efficient.
>
>
>
>
>
> *From: *Chesnay Schepler <ch...@apache.org>
> *Date: *Sunday, February 10, 2019 at 4:57 AM
> *To: *"Aggarwal, Ajay" <Aj...@netapp.com>, "user@flink.apache.org"
> <us...@flink.apache.org>
> *Subject: *Re: stream of large objects
>
>
>
> *NetApp Security WARNING*: This is an external email. Do not click links
> or open attachments unless you recognize the sender and know the content is
> safe.
>
>
>
> The Broadcast State
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/broadcast_state.html#the-broadcast-state-pattern>
> may be interesting to you.
>
> On 08.02.2019 15:57, Aggarwal, Ajay wrote:
>
> Yes, another KeyBy will be used. The “small size” messages will be strings
> of length 500 to 1000.
>
>
>
> Is there a concept of “global” state in flink? Is it possible to keep
> these lists in global state and only pass the list reference (by name?) in
> the LargeMessage?
>
>
>
>
>
> *From: *Chesnay Schepler <ch...@apache.org> <ch...@apache.org>
> *Date: *Friday, February 8, 2019 at 8:45 AM
> *To: *"Aggarwal, Ajay" <Aj...@netapp.com>
> <Aj...@netapp.com>, "user@flink.apache.org"
> <us...@flink.apache.org> <us...@flink.apache.org> <us...@flink.apache.org>
> *Subject: *Re: stream of large objects
>
>
>
> Whether a LargeMessage is serialized depends on how the job is structured.
> For example, if you were to only apply map/filter functions after the
> aggregation it is likely they wouldn't be serialized.
> If you were to apply another keyBy they will be serialized again.
>
> When you say "small size" messages, what are we talking about here?
>
> On 07.02.2019 20:37, Aggarwal, Ajay wrote:
>
> In my use case my source stream contain small size messages, but as part
> of flink processing I will be aggregating them into large messages and
> further processing will happen on these large messages. The structure of
> this large message will be something like this:
>
>
>
>    Class LargeMessage {
>
>         String key
>
>        List <String> messages; // this is where the aggregation of smaller
> messages happen
>
>    }
>
>
>
> In some cases this list field of LargeMessage can get very large (1000’s
> of messages). Is it ok to create an intermediate stream of these
> LargeMessages? What should I be concerned about while designing the flink
> job? Specifically with parallelism in mind. As these LargeMessages flow
> from one flink subtask to another, do they get serialized/deserialized ?
>
>
>
> Thanks.
>
>
>
>
>
>
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Re: stream of large objects

Posted by "Aggarwal, Ajay" <Aj...@netapp.com>.
I looked a little into broadcast state and while its interesting I don’t think it will help me. Since broadcast state is kept all in-memory, I am worried about memory requirement if I make all these LargeMessages part of broadcast state. Furthermore these LargeMessages need to be processed in a Keyed context, so sharing all of these across all downstream tasks does not seem efficient.


From: Chesnay Schepler <ch...@apache.org>
Date: Sunday, February 10, 2019 at 4:57 AM
To: "Aggarwal, Ajay" <Aj...@netapp.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: stream of large objects

NetApp Security WARNING: This is an external email. Do not click links or open attachments unless you recognize the sender and know the content is safe.



The Broadcast State<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/broadcast_state.html#the-broadcast-state-pattern> may be interesting to you.

On 08.02.2019 15:57, Aggarwal, Ajay wrote:
Yes, another KeyBy will be used. The “small size” messages will be strings of length 500 to 1000.

Is there a concept of “global” state in flink? Is it possible to keep these lists in global state and only pass the list reference (by name?) in the LargeMessage?


From: Chesnay Schepler <ch...@apache.org>
Date: Friday, February 8, 2019 at 8:45 AM
To: "Aggarwal, Ajay" <Aj...@netapp.com>, "user@flink.apache.org"<ma...@flink.apache.org> <us...@flink.apache.org>
Subject: Re: stream of large objects

Whether a LargeMessage is serialized depends on how the job is structured.
For example, if you were to only apply map/filter functions after the aggregation it is likely they wouldn't be serialized.
If you were to apply another keyBy they will be serialized again.

When you say "small size" messages, what are we talking about here?

On 07.02.2019 20:37, Aggarwal, Ajay wrote:
In my use case my source stream contain small size messages, but as part of flink processing I will be aggregating them into large messages and further processing will happen on these large messages. The structure of this large message will be something like this:

   Class LargeMessage {
        String key
       List <String> messages; // this is where the aggregation of smaller messages happen
   }

In some cases this list field of LargeMessage can get very large (1000’s of messages). Is it ok to create an intermediate stream of these LargeMessages? What should I be concerned about while designing the flink job? Specifically with parallelism in mind. As these LargeMessages flow from one flink subtask to another, do they get serialized/deserialized ?

Thanks.






Re: stream of large objects

Posted by Chesnay Schepler <ch...@apache.org>.
The Broadcast State 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/broadcast_state.html#the-broadcast-state-pattern> 
may be interesting to you.

On 08.02.2019 15:57, Aggarwal, Ajay wrote:
>
> Yes, another KeyBy will be used. The “small size” messages will be 
> strings of length 500 to 1000.
>
> Is there a concept of “global” state in flink? Is it possible to keep 
> these lists in global state and only pass the list reference (by 
> name?) in the LargeMessage?
>
> *From: *Chesnay Schepler <ch...@apache.org>
> *Date: *Friday, February 8, 2019 at 8:45 AM
> *To: *"Aggarwal, Ajay" <Aj...@netapp.com>, 
> "user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: stream of large objects
>
> Whether a LargeMessage is serialized depends on how the job is structured.
> For example, if you were to only apply map/filter functions after the 
> aggregation it is likely they wouldn't be serialized.
> If you were to apply another keyBy they will be serialized again.
>
> When you say "small size" messages, what are we talking about here?
>
> On 07.02.2019 20:37, Aggarwal, Ajay wrote:
>
>     In my use case my source stream contain small size messages, but
>     as part of flink processing I will be aggregating them into large
>     messages and further processing will happen on these large
>     messages. The structure of this large message will be something
>     like this:
>
>        Class LargeMessage {
>
>           String key
>
>            List <String> messages; // this is where the aggregation of
>     smaller messages happen
>
>        }
>
>     In some cases this list field of LargeMessage can get very large
>     (1000’s of messages). Is it ok to create an intermediate stream of
>     these LargeMessages? What should I be concerned about while
>     designing the flink job? Specifically with parallelism in mind. As
>     these LargeMessages flow from one flink subtask to another, do
>     they get serialized/deserialized ?
>
>     Thanks.
>


Re: stream of large objects

Posted by "Aggarwal, Ajay" <Aj...@netapp.com>.
Yes, another KeyBy will be used. The “small size” messages will be strings of length 500 to 1000.

Is there a concept of “global” state in flink? Is it possible to keep these lists in global state and only pass the list reference (by name?) in the LargeMessage?


From: Chesnay Schepler <ch...@apache.org>
Date: Friday, February 8, 2019 at 8:45 AM
To: "Aggarwal, Ajay" <Aj...@netapp.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: stream of large objects

Whether a LargeMessage is serialized depends on how the job is structured.
For example, if you were to only apply map/filter functions after the aggregation it is likely they wouldn't be serialized.
If you were to apply another keyBy they will be serialized again.

When you say "small size" messages, what are we talking about here?

On 07.02.2019 20:37, Aggarwal, Ajay wrote:
In my use case my source stream contain small size messages, but as part of flink processing I will be aggregating them into large messages and further processing will happen on these large messages. The structure of this large message will be something like this:

   Class LargeMessage {
        String key
       List <String> messages; // this is where the aggregation of smaller messages happen
   }

In some cases this list field of LargeMessage can get very large (1000’s of messages). Is it ok to create an intermediate stream of these LargeMessages? What should I be concerned about while designing the flink job? Specifically with parallelism in mind. As these LargeMessages flow from one flink subtask to another, do they get serialized/deserialized ?

Thanks.




Re: stream of large objects

Posted by Chesnay Schepler <ch...@apache.org>.
Whether a LargeMessage is serialized depends on how the job is structured.
For example, if you were to only apply map/filter functions after the 
aggregation it is likely they wouldn't be serialized.
If you were to apply another keyBy they will be serialized again.

When you say "small size" messages, what are we talking about here?

On 07.02.2019 20:37, Aggarwal, Ajay wrote:
>
> In my use case my source stream contain small size messages, but as 
> part of flink processing I will be aggregating them into large 
> messages and further processing will happen on these large messages. 
> The structure of this large message will be something like this:
>
>    Class LargeMessage {
>
>       String key
>
>        List <String> messages; // this is where the aggregation of 
> smaller messages happen
>
>    }
>
> In some cases this list field of LargeMessage can get very large 
> (1000’s of messages). Is it ok to create an intermediate stream of 
> these LargeMessages? What should I be concerned about while designing 
> the flink job? Specifically with parallelism in mind. As these 
> LargeMessages flow from one flink subtask to another, do they get 
> serialized/deserialized ?
>
> Thanks.
>