You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Theodore Vasiloudis <th...@gmail.com> on 2016/08/05 00:56:10 UTC

Having a single copy of an object read in a RichMapFunction

Hello all,

for a prototype we are looking into we would like to read a big matrix from
HDFS, and for every element that comes in a stream of vectors do on
multiplication with the matrix. The matrix should fit in the memory of one
machine.

We can read in the matrix using a RichMapFunction, but that would mean
that a copy of the matrix is made for each Task Slot AFAIK, if the
RichMapFunction is instantiated once per Task Slot.

So I'm wondering how should we try address this problem, is it possible to
have just one copy of the object in memory per TM?

As a follow-up if we have more than one TM per node, is it possible to
share memory between them? My guess is that we have to look at some
external store for that.

Cheers,
Theo

Re: Having a single copy of an object read in a RichMapFunction

Posted by Theodore Vasiloudis <th...@gmail.com>.
Thank you for the help Robert!

Regarding the static field alternative you provided, I'm a bit confused
about the difference between slots and instances.

When you say that by using a static field it will be shared by all
instances of the Map on the slot, does that mean that if the TM has
multiple slots, we again get multiple copies of the data?

Or does it mean that with a static field on a TM with multiple slots, we
only get one copy per TM, i.e. the slots share the same data?

As an example, if I have 1 TM with 3 slots, and I run the Map with a
parallelism of 3, I get 3 copies of the data on that TM. Does using a
static field change that?

On Mon, Aug 8, 2016 at 7:19 AM, Robert Metzger <rm...@apache.org> wrote:

> Hi Theo,
>
> I think there are some variants you can try out for the problem. I think
> it depends a bit on the performance characteristics you expect:
> - The simplest variant is to run one TM per machine with one slot only.
> This is probably not feasible because you can't use all the CPU cores
> - ... to solve that problem, you could use the same setup, but a worker
> thread pool, sharing one matrix per machine.
> - If you need higher parallelism in Flink, you could also have multiple
> slots per TM, and use a static field in your RichFlatMap class. The static
> field will be shared by all FlatMap instances of the slot.
>
> Flink doesn't have any build-in tooling for sharing memory between
> multiple TaskManagers on the same machine, but you can try to use anything
> available in Java (memory mapped files, JNI, ..)
>
>
> On Fri, Aug 5, 2016 at 7:10 PM, Sameer Wadkar <sa...@axiomine.com> wrote:
>
>> You mean "Connected Streams"? I use that for the same requirement. I way
>> it works it looks like it creates multiple copies per co-map operation. I
>> use the keyed version to match side inputs with the data.
>>
>> Sent from my iPhone
>>
>> On Aug 5, 2016, at 12:36 PM, Theodore Vasiloudis <
>> theodoros.vasiloudis@gmail.com> wrote:
>>
>> Yes this is a streaming use case, so broadcast is not an option.
>>
>> If I get it correctly with connected streams I would emulate side input
>> by "streaming" the matrix with a key that all incoming vector records match
>> on?
>>
>> Wouldn't that create multiple copies of the matrix in memory?
>>
>> On Thu, Aug 4, 2016 at 6:56 PM, Sameer W <sa...@axiomine.com> wrote:
>>
>>> Theodore,
>>>
>>> Broadcast variables do that when using the DataSet API -
>>> http://data-artisans.com/how-to-factorize-a-700-gb-matrix-
>>> with-apache-flink/
>>>
>>> See the following lines in the article-
>>> To support the above presented algorithm efficiently we had to improve
>>> Flink’s broadcasting mechanism since it easily becomes the bottleneck of
>>> the implementation. The enhanced Flink version can share broadcast
>>> variables among multiple tasks running on the same machine. *Sharing
>>> avoids having to keep for each task an individual copy of the broadcasted
>>> variable on the heap. This increases the memory efficiency significantly,
>>> especially if the broadcasted variables can grow up to several GBs of size.*
>>>
>>> If you are using in the DataStream API then side-inputs (not yet
>>> implemented) would achieve the same as broadcast variables.  (
>>> https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiX
>>> wSBXjv-M5eOv-MKQYN3m4/edit#) . I use keyed Connected Streams in
>>> situation where I need them for one of my use-cases (propagating rule
>>> changes to the data) where I could have used side-inputs.
>>>
>>> Sameer
>>>
>>>
>>>
>>>
>>> On Thu, Aug 4, 2016 at 8:56 PM, Theodore Vasiloudis <
>>> theodoros.vasiloudis@gmail.com> wrote:
>>>
>>>> Hello all,
>>>>
>>>> for a prototype we are looking into we would like to read a big matrix
>>>> from HDFS, and for every element that comes in a stream of vectors do on
>>>> multiplication with the matrix. The matrix should fit in the memory of one
>>>> machine.
>>>>
>>>> We can read in the matrix using a RichMapFunction, but that would mean
>>>> that a copy of the matrix is made for each Task Slot AFAIK, if the
>>>> RichMapFunction is instantiated once per Task Slot.
>>>>
>>>> So I'm wondering how should we try address this problem, is it possible
>>>> to have just one copy of the object in memory per TM?
>>>>
>>>> As a follow-up if we have more than one TM per node, is it possible to
>>>> share memory between them? My guess is that we have to look at some
>>>> external store for that.
>>>>
>>>> Cheers,
>>>> Theo
>>>>
>>>
>>>
>>
>

Re: Having a single copy of an object read in a RichMapFunction

Posted by Robert Metzger <rm...@apache.org>.
Hi Theo,

I think there are some variants you can try out for the problem. I think it
depends a bit on the performance characteristics you expect:
- The simplest variant is to run one TM per machine with one slot only.
This is probably not feasible because you can't use all the CPU cores
- ... to solve that problem, you could use the same setup, but a worker
thread pool, sharing one matrix per machine.
- If you need higher parallelism in Flink, you could also have multiple
slots per TM, and use a static field in your RichFlatMap class. The static
field will be shared by all FlatMap instances of the slot.

Flink doesn't have any build-in tooling for sharing memory between multiple
TaskManagers on the same machine, but you can try to use anything available
in Java (memory mapped files, JNI, ..)


On Fri, Aug 5, 2016 at 7:10 PM, Sameer Wadkar <sa...@axiomine.com> wrote:

> You mean "Connected Streams"? I use that for the same requirement. I way
> it works it looks like it creates multiple copies per co-map operation. I
> use the keyed version to match side inputs with the data.
>
> Sent from my iPhone
>
> On Aug 5, 2016, at 12:36 PM, Theodore Vasiloudis <
> theodoros.vasiloudis@gmail.com> wrote:
>
> Yes this is a streaming use case, so broadcast is not an option.
>
> If I get it correctly with connected streams I would emulate side input by
> "streaming" the matrix with a key that all incoming vector records match on?
>
> Wouldn't that create multiple copies of the matrix in memory?
>
> On Thu, Aug 4, 2016 at 6:56 PM, Sameer W <sa...@axiomine.com> wrote:
>
>> Theodore,
>>
>> Broadcast variables do that when using the DataSet API -
>> http://data-artisans.com/how-to-factorize-a-700-gb-matrix-
>> with-apache-flink/
>>
>> See the following lines in the article-
>> To support the above presented algorithm efficiently we had to improve
>> Flink’s broadcasting mechanism since it easily becomes the bottleneck of
>> the implementation. The enhanced Flink version can share broadcast
>> variables among multiple tasks running on the same machine. *Sharing
>> avoids having to keep for each task an individual copy of the broadcasted
>> variable on the heap. This increases the memory efficiency significantly,
>> especially if the broadcasted variables can grow up to several GBs of size.*
>>
>> If you are using in the DataStream API then side-inputs (not yet
>> implemented) would achieve the same as broadcast variables.  (
>> https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiX
>> wSBXjv-M5eOv-MKQYN3m4/edit#) . I use keyed Connected Streams in
>> situation where I need them for one of my use-cases (propagating rule
>> changes to the data) where I could have used side-inputs.
>>
>> Sameer
>>
>>
>>
>>
>> On Thu, Aug 4, 2016 at 8:56 PM, Theodore Vasiloudis <
>> theodoros.vasiloudis@gmail.com> wrote:
>>
>>> Hello all,
>>>
>>> for a prototype we are looking into we would like to read a big matrix
>>> from HDFS, and for every element that comes in a stream of vectors do on
>>> multiplication with the matrix. The matrix should fit in the memory of one
>>> machine.
>>>
>>> We can read in the matrix using a RichMapFunction, but that would mean
>>> that a copy of the matrix is made for each Task Slot AFAIK, if the
>>> RichMapFunction is instantiated once per Task Slot.
>>>
>>> So I'm wondering how should we try address this problem, is it possible
>>> to have just one copy of the object in memory per TM?
>>>
>>> As a follow-up if we have more than one TM per node, is it possible to
>>> share memory between them? My guess is that we have to look at some
>>> external store for that.
>>>
>>> Cheers,
>>> Theo
>>>
>>
>>
>

Re: Having a single copy of an object read in a RichMapFunction

Posted by Sameer Wadkar <sa...@axiomine.com>.
You mean "Connected Streams"? I use that for the same requirement. I way it works it looks like it creates multiple copies per co-map operation. I use the keyed version to match side inputs with the data. 

Sent from my iPhone

> On Aug 5, 2016, at 12:36 PM, Theodore Vasiloudis <th...@gmail.com> wrote:
> 
> Yes this is a streaming use case, so broadcast is not an option.
> 
> If I get it correctly with connected streams I would emulate side input by "streaming" the matrix with a key that all incoming vector records match on?
> 
> Wouldn't that create multiple copies of the matrix in memory?
> 
>> On Thu, Aug 4, 2016 at 6:56 PM, Sameer W <sa...@axiomine.com> wrote:
>> Theodore,
>> 
>> Broadcast variables do that when using the DataSet API - http://data-artisans.com/how-to-factorize-a-700-gb-matrix-with-apache-flink/
>> 
>> See the following lines in the article-
>> To support the above presented algorithm efficiently we had to improve Flink’s broadcasting mechanism since it easily becomes the bottleneck of the implementation. The enhanced Flink version can share broadcast variables among multiple tasks running on the same machine. Sharing avoids having to keep for each task an individual copy of the broadcasted variable on the heap. This increases the memory efficiency significantly, especially if the broadcasted variables can grow up to several GBs of size.
>> 
>> If you are using in the DataStream API then side-inputs (not yet implemented) would achieve the same as broadcast variables.  (https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#) . I use keyed Connected Streams in situation where I need them for one of my use-cases (propagating rule changes to the data) where I could have used side-inputs.
>> 
>> Sameer
>> 
>> 
>> 
>> 
>>> On Thu, Aug 4, 2016 at 8:56 PM, Theodore Vasiloudis <th...@gmail.com> wrote:
>>> Hello all,
>>> 
>>> for a prototype we are looking into we would like to read a big matrix from HDFS, and for every element that comes in a stream of vectors do on multiplication with the matrix. The matrix should fit in the memory of one machine.
>>> 
>>> We can read in the matrix using a RichMapFunction, but that would mean
>>> that a copy of the matrix is made for each Task Slot AFAIK, if the RichMapFunction is instantiated once per Task Slot.
>>> 
>>> So I'm wondering how should we try address this problem, is it possible to have just one copy of the object in memory per TM?
>>> 
>>> As a follow-up if we have more than one TM per node, is it possible to share memory between them? My guess is that we have to look at some external store for that.
>>> 
>>> Cheers,
>>> Theo
> 

Re: Having a single copy of an object read in a RichMapFunction

Posted by Theodore Vasiloudis <th...@gmail.com>.
Yes this is a streaming use case, so broadcast is not an option.

If I get it correctly with connected streams I would emulate side input by
"streaming" the matrix with a key that all incoming vector records match on?

Wouldn't that create multiple copies of the matrix in memory?

On Thu, Aug 4, 2016 at 6:56 PM, Sameer W <sa...@axiomine.com> wrote:

> Theodore,
>
> Broadcast variables do that when using the DataSet API -
> http://data-artisans.com/how-to-factorize-a-700-gb-
> matrix-with-apache-flink/
>
> See the following lines in the article-
> To support the above presented algorithm efficiently we had to improve
> Flink’s broadcasting mechanism since it easily becomes the bottleneck of
> the implementation. The enhanced Flink version can share broadcast
> variables among multiple tasks running on the same machine. *Sharing
> avoids having to keep for each task an individual copy of the broadcasted
> variable on the heap. This increases the memory efficiency significantly,
> especially if the broadcasted variables can grow up to several GBs of size.*
>
> If you are using in the DataStream API then side-inputs (not yet
> implemented) would achieve the same as broadcast variables.  (
> https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-
> MKQYN3m4/edit#) . I use keyed Connected Streams in situation where I need
> them for one of my use-cases (propagating rule changes to the data) where I
> could have used side-inputs.
>
> Sameer
>
>
>
>
> On Thu, Aug 4, 2016 at 8:56 PM, Theodore Vasiloudis <
> theodoros.vasiloudis@gmail.com> wrote:
>
>> Hello all,
>>
>> for a prototype we are looking into we would like to read a big matrix
>> from HDFS, and for every element that comes in a stream of vectors do on
>> multiplication with the matrix. The matrix should fit in the memory of one
>> machine.
>>
>> We can read in the matrix using a RichMapFunction, but that would mean
>> that a copy of the matrix is made for each Task Slot AFAIK, if the
>> RichMapFunction is instantiated once per Task Slot.
>>
>> So I'm wondering how should we try address this problem, is it possible
>> to have just one copy of the object in memory per TM?
>>
>> As a follow-up if we have more than one TM per node, is it possible to
>> share memory between them? My guess is that we have to look at some
>> external store for that.
>>
>> Cheers,
>> Theo
>>
>
>

Re: Having a single copy of an object read in a RichMapFunction

Posted by Sameer W <sa...@axiomine.com>.
Theodore,

Broadcast variables do that when using the DataSet API -
http://data-artisans.com/how-to-factorize-a-700-gb-matrix-with-apache-flink/

See the following lines in the article-
To support the above presented algorithm efficiently we had to improve
Flink’s broadcasting mechanism since it easily becomes the bottleneck of
the implementation. The enhanced Flink version can share broadcast
variables among multiple tasks running on the same machine. *Sharing avoids
having to keep for each task an individual copy of the broadcasted variable
on the heap. This increases the memory efficiency significantly, especially
if the broadcasted variables can grow up to several GBs of size.*

If you are using in the DataStream API then side-inputs (not yet
implemented) would achieve the same as broadcast variables.  (
https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#)
. I use keyed Connected Streams in situation where I need them for one of
my use-cases (propagating rule changes to the data) where I could have used
side-inputs.

Sameer




On Thu, Aug 4, 2016 at 8:56 PM, Theodore Vasiloudis <
theodoros.vasiloudis@gmail.com> wrote:

> Hello all,
>
> for a prototype we are looking into we would like to read a big matrix
> from HDFS, and for every element that comes in a stream of vectors do on
> multiplication with the matrix. The matrix should fit in the memory of one
> machine.
>
> We can read in the matrix using a RichMapFunction, but that would mean
> that a copy of the matrix is made for each Task Slot AFAIK, if the
> RichMapFunction is instantiated once per Task Slot.
>
> So I'm wondering how should we try address this problem, is it possible to
> have just one copy of the object in memory per TM?
>
> As a follow-up if we have more than one TM per node, is it possible to
> share memory between them? My guess is that we have to look at some
> external store for that.
>
> Cheers,
> Theo
>