You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Kunft, Andreas" <an...@tu-berlin.de> on 2016/06/08 11:45:37 UTC

Broadcast data sent increases with # slots per TM

Hi,


we experience some unexpected increase of data sent over the network for broadcasts with increasing number of slots per Taskmanager.


We provided a benchmark [1]. It not only increases the size of data sent over the network but also hurts performance as seen in the preliminary results below. In this results cloud-11 has 25 nodes and ibm-power has 8 nodes with scaling the number of slots per node from 1 - 16.


+-----------------------+--------------+-------------+
| suite                 | name         | median_time |
+=======================+==============+=============+
| broadcast.cloud-11    | broadcast.01 |        8796 |
| broadcast.cloud-11    | broadcast.02 |       14802 |
| broadcast.cloud-11    | broadcast.04 |       30173 |
| broadcast.cloud-11    | broadcast.08 |       56936 |
| broadcast.cloud-11    | broadcast.16 |      117507 |
| broadcast.ibm-power-1 | broadcast.01 |        6807 |
| broadcast.ibm-power-1 | broadcast.02 |        8443 |
| broadcast.ibm-power-1 | broadcast.04 |       11823 |
| broadcast.ibm-power-1 | broadcast.08 |       21655 |
| broadcast.ibm-power-1 | broadcast.16 |       37426 |
+-----------------------+--------------+-------------+



After looking into the code base it, it seems that the data is de-serialized only once per TM, but the actual data is sent for all slots running the operator with broadcast vars and just gets discarded in case its already de-serialized.


I do not see a reason the data can't be shared among the slots of a TM and therefore just sent once, but I guess it would require quite some changes bc sets are handled currently.


Are there any future plans regarding this and/or is there interest in this "feature"?


Best

Andreas?


[1] https://github.com/TU-Berlin-DIMA/flink-broadcast?



Re: Broadcast data sent increases with # slots per TM

Posted by Till Rohrmann <tr...@apache.org>.
Hi Felix,

thanks for all the work you've put into the design document and your
experiments. The Flink community has recently agreed to publish these
design documents as part of the FLIP (Flink improvement proposal) process
in the Flink wiki. It would be great if you could post your design document
there [1]. If you don't have access yet, then please send me your
confluence wiki account and I'll add you.

[1]
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

Cheers,
Till

On Fri, Jul 22, 2016 at 12:40 AM, Felix Neutatz <ne...@googlemail.com>
wrote:

> Hi everybody,
>
> I found an issue with my first approach therefore I couldn't run the
> experiments yet. In the design document I summarized my ideas and work of
> the last weeks on this issue.
>
> You can find the design document here:
>
> https://docs.google.com/document/d/1odYIvmQt4feonQF9q-btBnGvrzzN3lX0Os6rzHcCOjA/edit?usp=sharing
>
> I highly appreciate any idea or comment and I am looking forward to the
> discussion to finally solve this issue :)
>
> Best regards,
> Felix
>
> 2016-07-08 1:47 GMT+02:00 Felix Neutatz <ne...@googlemail.com>:
>
> > Hi,
> >
> > i already started to work on this issue. Therefore I created a Jira:
> > https://issues.apache.org/jira/browse/FLINK-4175
> > I have already implemented a quick version which could solve it. I will
> > run the experiments on the cluster first and will describe my approach on
> > Monday :)
> >
> > Have a nice weekend,
> > Felix
> >
> > P.S. for super curious people:
> >
> https://github.com/FelixNeutatz/incubator-flink/commit/7d79d4dfe3f18208a73d6b692b3909f9c69a1da7
> >
> > 2016-06-09 11:50 GMT+02:00 Felix Neutatz <ne...@googlemail.com>:
> >
> >> Hi everybody,
> >>
> >> could we use the org.apache.flink.api.common.cache.DistributedCache to
> >> work around this Broadcast issue for the moment, until we fixed it?
> >> Or do you think it won't scale either?
> >>
> >> Best regards,
> >> Felix
> >>
> >> 2016-06-09 10:57 GMT+02:00 Stephan Ewen <se...@apache.org>:
> >>
> >>> Till is right. Broadcast joins currently materialize once per slot.
> >>> Originally, the purely push based runtime was not good enough to handle
> >>> it
> >>> differently.
> >>>
> >>> By now, we could definitely handle BC Vars differently (only one slot
> per
> >>> TM requests).
> >>> For BC Joins, the hash tables do not coordinate spilling currently,
> which
> >>> means that we cannot do multiple joins through the same hash table.
> >>>
> >>>
> >>> On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann <tr...@apache.org>
> >>> wrote:
> >>>
> >>> > If I'm not mistaken, then broadcast variables and broadcast inputs of
> >>> joins
> >>> > follow different code paths. Broadcast variables use additional input
> >>> > channels and are read before the actual driver code runs. In contrast
> >>> to
> >>> > that, a join operation is a two input operator where the join driver
> >>> > decides how to handle the inputs (which one to read first as build
> >>> input).
> >>> >
> >>> > This also entails that the broadcast variable optimization, where
> each
> >>> task
> >>> > manager holds the data only once and copies of the data are discarded
> >>> (but
> >>> > they are transmitted multiple times to the TM), does not apply to the
> >>> > broadcast join inputs. Here you should see an slightly worse
> >>> performance
> >>> > degradation with your initial benchmark if you increase the number of
> >>> > slots.
> >>> >
> >>> > Cheers,
> >>> > Till
> >>> >
> >>> > On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
> >>> > alexander.s.alexandrov@gmail.com> wrote:
> >>> >
> >>> > > > As far as I know, the reason why the broadcast variables are
> >>> > implemented
> >>> > > that way is that the senders would have to know which sub-tasks are
> >>> > > deployed to which TMs.
> >>> > >
> >>> > > As the broadcast variables are realized as additionally attached
> >>> > "broadcast
> >>> > > channels", I am assuming that the same behavior will apply for
> >>> broadcast
> >>> > > joins as well.
> >>> > >
> >>> > > Is this the case?
> >>> > >
> >>> > > Regards,
> >>> > > Alexander
> >>> > >
> >>> > >
> >>> > > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas <
> >>> andreas.kunft@tu-berlin.de>:
> >>> > >
> >>> > > > Hi Till,
> >>> > > >
> >>> > > > thanks for the fast answer.
> >>> > > > I'll think about a concrete way of implementing and open an JIRA.
> >>> > > >
> >>> > > > Best
> >>> > > > Andreas
> >>> > > > ________________________________________
> >>> > > > Von: Till Rohrmann <tr...@apache.org>
> >>> > > > Gesendet: Mittwoch, 8. Juni 2016 15:53
> >>> > > > An: dev@flink.apache.org
> >>> > > > Betreff: Re: Broadcast data sent increases with # slots per TM
> >>> > > >
> >>> > > > Hi Andreas,
> >>> > > >
> >>> > > > your observation is correct. The data is sent to each slot and
> the
> >>> > > > receiving TM only materializes one copy of the data. The rest of
> >>> the
> >>> > data
> >>> > > > is discarded.
> >>> > > >
> >>> > > > As far as I know, the reason why the broadcast variables are
> >>> > implemented
> >>> > > > that way is that the senders would have to know which sub-tasks
> are
> >>> > > > deployed to which TMs. Only then, you can decide for which
> >>> sub-tasks
> >>> > you
> >>> > > > can send the data together. Since the output emitters are
> agnostic
> >>> to
> >>> > the
> >>> > > > actual deployment, the necessary information would have to be
> >>> forwarded
> >>> > > to
> >>> > > > them.
> >>> > > >
> >>> > > > Another problem is that if you pick one of the sub-tasks to
> >>> receive the
> >>> > > > broadcast set, then you have to make sure, that this sub-task has
> >>> read
> >>> > > and
> >>> > > > materialized the broadcast set before the other sub-tasks start
> >>> > working.
> >>> > > > One could maybe send to one sub-task first the broadcast set and
> >>> then
> >>> > to
> >>> > > > all other sub-tasks, after one has sent the BC set, a kind of
> >>> > acknowledge
> >>> > > > record. That way, the other sub-tasks would block until the
> >>> broadcast
> >>> > set
> >>> > > > has been completely transmitted. But here one has to make sure
> >>> that the
> >>> > > > sub-task receiving the BC set has been deployed and is not queued
> >>> up
> >>> > for
> >>> > > > scheduling.
> >>> > > >
> >>> > > > So there are some challenges to solve in order to optimize the BC
> >>> sets.
> >>> > > > Currently, there is nobody working on it. If you want to start
> >>> working
> >>> > on
> >>> > > > it, then I would recommend to open a JIRA and start writing a
> >>> design
> >>> > > > document for it.
> >>> > > >
> >>> > > > Cheers,
> >>> > > > Till
> >>> > > >
> >>> > > > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <
> >>> > > andreas.kunft@tu-berlin.de
> >>> > > > >
> >>> > > > wrote:
> >>> > > >
> >>> > > > > Hi,
> >>> > > > >
> >>> > > > >
> >>> > > > > we experience some unexpected increase of data sent over the
> >>> network
> >>> > > for
> >>> > > > > broadcasts with increasing number of slots per Taskmanager.
> >>> > > > >
> >>> > > > >
> >>> > > > > We provided a benchmark [1]. It not only increases the size of
> >>> data
> >>> > > sent
> >>> > > > > over the network but also hurts performance as seen in the
> >>> > preliminary
> >>> > > > > results below. In this results cloud-11 has 25 nodes and
> >>> ibm-power
> >>> > has
> >>> > > 8
> >>> > > > > nodes with scaling the number of slots per node from 1 - 16.
> >>> > > > >
> >>> > > > >
> >>> > > > > +-----------------------+--------------+-------------+
> >>> > > > > | suite                 | name         | median_time |
> >>> > > > > +=======================+==============+=============+
> >>> > > > > | broadcast.cloud-11    | broadcast.01 |        8796 |
> >>> > > > > | broadcast.cloud-11    | broadcast.02 |       14802 |
> >>> > > > > | broadcast.cloud-11    | broadcast.04 |       30173 |
> >>> > > > > | broadcast.cloud-11    | broadcast.08 |       56936 |
> >>> > > > > | broadcast.cloud-11    | broadcast.16 |      117507 |
> >>> > > > > | broadcast.ibm-power-1 | broadcast.01 |        6807 |
> >>> > > > > | broadcast.ibm-power-1 | broadcast.02 |        8443 |
> >>> > > > > | broadcast.ibm-power-1 | broadcast.04 |       11823 |
> >>> > > > > | broadcast.ibm-power-1 | broadcast.08 |       21655 |
> >>> > > > > | broadcast.ibm-power-1 | broadcast.16 |       37426 |
> >>> > > > > +-----------------------+--------------+-------------+
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > > > After looking into the code base it, it seems that the data is
> >>> > > > > de-serialized only once per TM, but the actual data is sent for
> >>> all
> >>> > > slots
> >>> > > > > running the operator with broadcast vars and just gets
> discarded
> >>> in
> >>> > > case
> >>> > > > > its already de-serialized.
> >>> > > > >
> >>> > > > >
> >>> > > > > I do not see a reason the data can't be shared among the slots
> >>> of a
> >>> > TM
> >>> > > > and
> >>> > > > > therefore just sent once, but I guess it would require quite
> some
> >>> > > changes
> >>> > > > > bc sets are handled currently.
> >>> > > > >
> >>> > > > >
> >>> > > > > Are there any future plans regarding this and/or is there
> >>> interest in
> >>> > > > this
> >>> > > > > "feature"?
> >>> > > > >
> >>> > > > >
> >>> > > > > Best
> >>> > > > >
> >>> > > > > Andreas?
> >>> > > > >
> >>> > > > >
> >>> > > > > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast?
> >>> > > > >
> >>> > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>

Re: Broadcast data sent increases with # slots per TM

Posted by Felix Neutatz <ne...@googlemail.com>.
Hi everybody,

I found an issue with my first approach therefore I couldn't run the
experiments yet. In the design document I summarized my ideas and work of
the last weeks on this issue.

You can find the design document here:
https://docs.google.com/document/d/1odYIvmQt4feonQF9q-btBnGvrzzN3lX0Os6rzHcCOjA/edit?usp=sharing

I highly appreciate any idea or comment and I am looking forward to the
discussion to finally solve this issue :)

Best regards,
Felix

2016-07-08 1:47 GMT+02:00 Felix Neutatz <ne...@googlemail.com>:

> Hi,
>
> i already started to work on this issue. Therefore I created a Jira:
> https://issues.apache.org/jira/browse/FLINK-4175
> I have already implemented a quick version which could solve it. I will
> run the experiments on the cluster first and will describe my approach on
> Monday :)
>
> Have a nice weekend,
> Felix
>
> P.S. for super curious people:
> https://github.com/FelixNeutatz/incubator-flink/commit/7d79d4dfe3f18208a73d6b692b3909f9c69a1da7
>
> 2016-06-09 11:50 GMT+02:00 Felix Neutatz <ne...@googlemail.com>:
>
>> Hi everybody,
>>
>> could we use the org.apache.flink.api.common.cache.DistributedCache to
>> work around this Broadcast issue for the moment, until we fixed it?
>> Or do you think it won't scale either?
>>
>> Best regards,
>> Felix
>>
>> 2016-06-09 10:57 GMT+02:00 Stephan Ewen <se...@apache.org>:
>>
>>> Till is right. Broadcast joins currently materialize once per slot.
>>> Originally, the purely push based runtime was not good enough to handle
>>> it
>>> differently.
>>>
>>> By now, we could definitely handle BC Vars differently (only one slot per
>>> TM requests).
>>> For BC Joins, the hash tables do not coordinate spilling currently, which
>>> means that we cannot do multiple joins through the same hash table.
>>>
>>>
>>> On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>> > If I'm not mistaken, then broadcast variables and broadcast inputs of
>>> joins
>>> > follow different code paths. Broadcast variables use additional input
>>> > channels and are read before the actual driver code runs. In contrast
>>> to
>>> > that, a join operation is a two input operator where the join driver
>>> > decides how to handle the inputs (which one to read first as build
>>> input).
>>> >
>>> > This also entails that the broadcast variable optimization, where each
>>> task
>>> > manager holds the data only once and copies of the data are discarded
>>> (but
>>> > they are transmitted multiple times to the TM), does not apply to the
>>> > broadcast join inputs. Here you should see an slightly worse
>>> performance
>>> > degradation with your initial benchmark if you increase the number of
>>> > slots.
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
>>> > alexander.s.alexandrov@gmail.com> wrote:
>>> >
>>> > > > As far as I know, the reason why the broadcast variables are
>>> > implemented
>>> > > that way is that the senders would have to know which sub-tasks are
>>> > > deployed to which TMs.
>>> > >
>>> > > As the broadcast variables are realized as additionally attached
>>> > "broadcast
>>> > > channels", I am assuming that the same behavior will apply for
>>> broadcast
>>> > > joins as well.
>>> > >
>>> > > Is this the case?
>>> > >
>>> > > Regards,
>>> > > Alexander
>>> > >
>>> > >
>>> > > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas <
>>> andreas.kunft@tu-berlin.de>:
>>> > >
>>> > > > Hi Till,
>>> > > >
>>> > > > thanks for the fast answer.
>>> > > > I'll think about a concrete way of implementing and open an JIRA.
>>> > > >
>>> > > > Best
>>> > > > Andreas
>>> > > > ________________________________________
>>> > > > Von: Till Rohrmann <tr...@apache.org>
>>> > > > Gesendet: Mittwoch, 8. Juni 2016 15:53
>>> > > > An: dev@flink.apache.org
>>> > > > Betreff: Re: Broadcast data sent increases with # slots per TM
>>> > > >
>>> > > > Hi Andreas,
>>> > > >
>>> > > > your observation is correct. The data is sent to each slot and the
>>> > > > receiving TM only materializes one copy of the data. The rest of
>>> the
>>> > data
>>> > > > is discarded.
>>> > > >
>>> > > > As far as I know, the reason why the broadcast variables are
>>> > implemented
>>> > > > that way is that the senders would have to know which sub-tasks are
>>> > > > deployed to which TMs. Only then, you can decide for which
>>> sub-tasks
>>> > you
>>> > > > can send the data together. Since the output emitters are agnostic
>>> to
>>> > the
>>> > > > actual deployment, the necessary information would have to be
>>> forwarded
>>> > > to
>>> > > > them.
>>> > > >
>>> > > > Another problem is that if you pick one of the sub-tasks to
>>> receive the
>>> > > > broadcast set, then you have to make sure, that this sub-task has
>>> read
>>> > > and
>>> > > > materialized the broadcast set before the other sub-tasks start
>>> > working.
>>> > > > One could maybe send to one sub-task first the broadcast set and
>>> then
>>> > to
>>> > > > all other sub-tasks, after one has sent the BC set, a kind of
>>> > acknowledge
>>> > > > record. That way, the other sub-tasks would block until the
>>> broadcast
>>> > set
>>> > > > has been completely transmitted. But here one has to make sure
>>> that the
>>> > > > sub-task receiving the BC set has been deployed and is not queued
>>> up
>>> > for
>>> > > > scheduling.
>>> > > >
>>> > > > So there are some challenges to solve in order to optimize the BC
>>> sets.
>>> > > > Currently, there is nobody working on it. If you want to start
>>> working
>>> > on
>>> > > > it, then I would recommend to open a JIRA and start writing a
>>> design
>>> > > > document for it.
>>> > > >
>>> > > > Cheers,
>>> > > > Till
>>> > > >
>>> > > > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <
>>> > > andreas.kunft@tu-berlin.de
>>> > > > >
>>> > > > wrote:
>>> > > >
>>> > > > > Hi,
>>> > > > >
>>> > > > >
>>> > > > > we experience some unexpected increase of data sent over the
>>> network
>>> > > for
>>> > > > > broadcasts with increasing number of slots per Taskmanager.
>>> > > > >
>>> > > > >
>>> > > > > We provided a benchmark [1]. It not only increases the size of
>>> data
>>> > > sent
>>> > > > > over the network but also hurts performance as seen in the
>>> > preliminary
>>> > > > > results below. In this results cloud-11 has 25 nodes and
>>> ibm-power
>>> > has
>>> > > 8
>>> > > > > nodes with scaling the number of slots per node from 1 - 16.
>>> > > > >
>>> > > > >
>>> > > > > +-----------------------+--------------+-------------+
>>> > > > > | suite                 | name         | median_time |
>>> > > > > +=======================+==============+=============+
>>> > > > > | broadcast.cloud-11    | broadcast.01 |        8796 |
>>> > > > > | broadcast.cloud-11    | broadcast.02 |       14802 |
>>> > > > > | broadcast.cloud-11    | broadcast.04 |       30173 |
>>> > > > > | broadcast.cloud-11    | broadcast.08 |       56936 |
>>> > > > > | broadcast.cloud-11    | broadcast.16 |      117507 |
>>> > > > > | broadcast.ibm-power-1 | broadcast.01 |        6807 |
>>> > > > > | broadcast.ibm-power-1 | broadcast.02 |        8443 |
>>> > > > > | broadcast.ibm-power-1 | broadcast.04 |       11823 |
>>> > > > > | broadcast.ibm-power-1 | broadcast.08 |       21655 |
>>> > > > > | broadcast.ibm-power-1 | broadcast.16 |       37426 |
>>> > > > > +-----------------------+--------------+-------------+
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > After looking into the code base it, it seems that the data is
>>> > > > > de-serialized only once per TM, but the actual data is sent for
>>> all
>>> > > slots
>>> > > > > running the operator with broadcast vars and just gets discarded
>>> in
>>> > > case
>>> > > > > its already de-serialized.
>>> > > > >
>>> > > > >
>>> > > > > I do not see a reason the data can't be shared among the slots
>>> of a
>>> > TM
>>> > > > and
>>> > > > > therefore just sent once, but I guess it would require quite some
>>> > > changes
>>> > > > > bc sets are handled currently.
>>> > > > >
>>> > > > >
>>> > > > > Are there any future plans regarding this and/or is there
>>> interest in
>>> > > > this
>>> > > > > "feature"?
>>> > > > >
>>> > > > >
>>> > > > > Best
>>> > > > >
>>> > > > > Andreas?
>>> > > > >
>>> > > > >
>>> > > > > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast?
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Re: Broadcast data sent increases with # slots per TM

Posted by Felix Neutatz <ne...@googlemail.com>.
Hi,

i already started to work on this issue. Therefore I created a Jira:
https://issues.apache.org/jira/browse/FLINK-4175
I have already implemented a quick version which could solve it. I will run
the experiments on the cluster first and will describe my approach on
Monday :)

Have a nice weekend,
Felix

P.S. for super curious people:
https://github.com/FelixNeutatz/incubator-flink/commit/7d79d4dfe3f18208a73d6b692b3909f9c69a1da7

2016-06-09 11:50 GMT+02:00 Felix Neutatz <ne...@googlemail.com>:

> Hi everybody,
>
> could we use the org.apache.flink.api.common.cache.DistributedCache to
> work around this Broadcast issue for the moment, until we fixed it?
> Or do you think it won't scale either?
>
> Best regards,
> Felix
>
> 2016-06-09 10:57 GMT+02:00 Stephan Ewen <se...@apache.org>:
>
>> Till is right. Broadcast joins currently materialize once per slot.
>> Originally, the purely push based runtime was not good enough to handle it
>> differently.
>>
>> By now, we could definitely handle BC Vars differently (only one slot per
>> TM requests).
>> For BC Joins, the hash tables do not coordinate spilling currently, which
>> means that we cannot do multiple joins through the same hash table.
>>
>>
>> On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>> > If I'm not mistaken, then broadcast variables and broadcast inputs of
>> joins
>> > follow different code paths. Broadcast variables use additional input
>> > channels and are read before the actual driver code runs. In contrast to
>> > that, a join operation is a two input operator where the join driver
>> > decides how to handle the inputs (which one to read first as build
>> input).
>> >
>> > This also entails that the broadcast variable optimization, where each
>> task
>> > manager holds the data only once and copies of the data are discarded
>> (but
>> > they are transmitted multiple times to the TM), does not apply to the
>> > broadcast join inputs. Here you should see an slightly worse performance
>> > degradation with your initial benchmark if you increase the number of
>> > slots.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
>> > alexander.s.alexandrov@gmail.com> wrote:
>> >
>> > > > As far as I know, the reason why the broadcast variables are
>> > implemented
>> > > that way is that the senders would have to know which sub-tasks are
>> > > deployed to which TMs.
>> > >
>> > > As the broadcast variables are realized as additionally attached
>> > "broadcast
>> > > channels", I am assuming that the same behavior will apply for
>> broadcast
>> > > joins as well.
>> > >
>> > > Is this the case?
>> > >
>> > > Regards,
>> > > Alexander
>> > >
>> > >
>> > > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas <andreas.kunft@tu-berlin.de
>> >:
>> > >
>> > > > Hi Till,
>> > > >
>> > > > thanks for the fast answer.
>> > > > I'll think about a concrete way of implementing and open an JIRA.
>> > > >
>> > > > Best
>> > > > Andreas
>> > > > ________________________________________
>> > > > Von: Till Rohrmann <tr...@apache.org>
>> > > > Gesendet: Mittwoch, 8. Juni 2016 15:53
>> > > > An: dev@flink.apache.org
>> > > > Betreff: Re: Broadcast data sent increases with # slots per TM
>> > > >
>> > > > Hi Andreas,
>> > > >
>> > > > your observation is correct. The data is sent to each slot and the
>> > > > receiving TM only materializes one copy of the data. The rest of the
>> > data
>> > > > is discarded.
>> > > >
>> > > > As far as I know, the reason why the broadcast variables are
>> > implemented
>> > > > that way is that the senders would have to know which sub-tasks are
>> > > > deployed to which TMs. Only then, you can decide for which sub-tasks
>> > you
>> > > > can send the data together. Since the output emitters are agnostic
>> to
>> > the
>> > > > actual deployment, the necessary information would have to be
>> forwarded
>> > > to
>> > > > them.
>> > > >
>> > > > Another problem is that if you pick one of the sub-tasks to receive
>> the
>> > > > broadcast set, then you have to make sure, that this sub-task has
>> read
>> > > and
>> > > > materialized the broadcast set before the other sub-tasks start
>> > working.
>> > > > One could maybe send to one sub-task first the broadcast set and
>> then
>> > to
>> > > > all other sub-tasks, after one has sent the BC set, a kind of
>> > acknowledge
>> > > > record. That way, the other sub-tasks would block until the
>> broadcast
>> > set
>> > > > has been completely transmitted. But here one has to make sure that
>> the
>> > > > sub-task receiving the BC set has been deployed and is not queued up
>> > for
>> > > > scheduling.
>> > > >
>> > > > So there are some challenges to solve in order to optimize the BC
>> sets.
>> > > > Currently, there is nobody working on it. If you want to start
>> working
>> > on
>> > > > it, then I would recommend to open a JIRA and start writing a design
>> > > > document for it.
>> > > >
>> > > > Cheers,
>> > > > Till
>> > > >
>> > > > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <
>> > > andreas.kunft@tu-berlin.de
>> > > > >
>> > > > wrote:
>> > > >
>> > > > > Hi,
>> > > > >
>> > > > >
>> > > > > we experience some unexpected increase of data sent over the
>> network
>> > > for
>> > > > > broadcasts with increasing number of slots per Taskmanager.
>> > > > >
>> > > > >
>> > > > > We provided a benchmark [1]. It not only increases the size of
>> data
>> > > sent
>> > > > > over the network but also hurts performance as seen in the
>> > preliminary
>> > > > > results below. In this results cloud-11 has 25 nodes and ibm-power
>> > has
>> > > 8
>> > > > > nodes with scaling the number of slots per node from 1 - 16.
>> > > > >
>> > > > >
>> > > > > +-----------------------+--------------+-------------+
>> > > > > | suite                 | name         | median_time |
>> > > > > +=======================+==============+=============+
>> > > > > | broadcast.cloud-11    | broadcast.01 |        8796 |
>> > > > > | broadcast.cloud-11    | broadcast.02 |       14802 |
>> > > > > | broadcast.cloud-11    | broadcast.04 |       30173 |
>> > > > > | broadcast.cloud-11    | broadcast.08 |       56936 |
>> > > > > | broadcast.cloud-11    | broadcast.16 |      117507 |
>> > > > > | broadcast.ibm-power-1 | broadcast.01 |        6807 |
>> > > > > | broadcast.ibm-power-1 | broadcast.02 |        8443 |
>> > > > > | broadcast.ibm-power-1 | broadcast.04 |       11823 |
>> > > > > | broadcast.ibm-power-1 | broadcast.08 |       21655 |
>> > > > > | broadcast.ibm-power-1 | broadcast.16 |       37426 |
>> > > > > +-----------------------+--------------+-------------+
>> > > > >
>> > > > >
>> > > > >
>> > > > > After looking into the code base it, it seems that the data is
>> > > > > de-serialized only once per TM, but the actual data is sent for
>> all
>> > > slots
>> > > > > running the operator with broadcast vars and just gets discarded
>> in
>> > > case
>> > > > > its already de-serialized.
>> > > > >
>> > > > >
>> > > > > I do not see a reason the data can't be shared among the slots of
>> a
>> > TM
>> > > > and
>> > > > > therefore just sent once, but I guess it would require quite some
>> > > changes
>> > > > > bc sets are handled currently.
>> > > > >
>> > > > >
>> > > > > Are there any future plans regarding this and/or is there
>> interest in
>> > > > this
>> > > > > "feature"?
>> > > > >
>> > > > >
>> > > > > Best
>> > > > >
>> > > > > Andreas?
>> > > > >
>> > > > >
>> > > > > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast?
>> > > > >
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: Broadcast data sent increases with # slots per TM

Posted by Felix Neutatz <ne...@googlemail.com>.
Hi everybody,

could we use the org.apache.flink.api.common.cache.DistributedCache to work
around this Broadcast issue for the moment, until we fixed it?
Or do you think it won't scale either?

Best regards,
Felix

2016-06-09 10:57 GMT+02:00 Stephan Ewen <se...@apache.org>:

> Till is right. Broadcast joins currently materialize once per slot.
> Originally, the purely push based runtime was not good enough to handle it
> differently.
>
> By now, we could definitely handle BC Vars differently (only one slot per
> TM requests).
> For BC Joins, the hash tables do not coordinate spilling currently, which
> means that we cannot do multiple joins through the same hash table.
>
>
> On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
> > If I'm not mistaken, then broadcast variables and broadcast inputs of
> joins
> > follow different code paths. Broadcast variables use additional input
> > channels and are read before the actual driver code runs. In contrast to
> > that, a join operation is a two input operator where the join driver
> > decides how to handle the inputs (which one to read first as build
> input).
> >
> > This also entails that the broadcast variable optimization, where each
> task
> > manager holds the data only once and copies of the data are discarded
> (but
> > they are transmitted multiple times to the TM), does not apply to the
> > broadcast join inputs. Here you should see an slightly worse performance
> > degradation with your initial benchmark if you increase the number of
> > slots.
> >
> > Cheers,
> > Till
> >
> > On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
> > alexander.s.alexandrov@gmail.com> wrote:
> >
> > > > As far as I know, the reason why the broadcast variables are
> > implemented
> > > that way is that the senders would have to know which sub-tasks are
> > > deployed to which TMs.
> > >
> > > As the broadcast variables are realized as additionally attached
> > "broadcast
> > > channels", I am assuming that the same behavior will apply for
> broadcast
> > > joins as well.
> > >
> > > Is this the case?
> > >
> > > Regards,
> > > Alexander
> > >
> > >
> > > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas <andreas.kunft@tu-berlin.de
> >:
> > >
> > > > Hi Till,
> > > >
> > > > thanks for the fast answer.
> > > > I'll think about a concrete way of implementing and open an JIRA.
> > > >
> > > > Best
> > > > Andreas
> > > > ________________________________________
> > > > Von: Till Rohrmann <tr...@apache.org>
> > > > Gesendet: Mittwoch, 8. Juni 2016 15:53
> > > > An: dev@flink.apache.org
> > > > Betreff: Re: Broadcast data sent increases with # slots per TM
> > > >
> > > > Hi Andreas,
> > > >
> > > > your observation is correct. The data is sent to each slot and the
> > > > receiving TM only materializes one copy of the data. The rest of the
> > data
> > > > is discarded.
> > > >
> > > > As far as I know, the reason why the broadcast variables are
> > implemented
> > > > that way is that the senders would have to know which sub-tasks are
> > > > deployed to which TMs. Only then, you can decide for which sub-tasks
> > you
> > > > can send the data together. Since the output emitters are agnostic to
> > the
> > > > actual deployment, the necessary information would have to be
> forwarded
> > > to
> > > > them.
> > > >
> > > > Another problem is that if you pick one of the sub-tasks to receive
> the
> > > > broadcast set, then you have to make sure, that this sub-task has
> read
> > > and
> > > > materialized the broadcast set before the other sub-tasks start
> > working.
> > > > One could maybe send to one sub-task first the broadcast set and then
> > to
> > > > all other sub-tasks, after one has sent the BC set, a kind of
> > acknowledge
> > > > record. That way, the other sub-tasks would block until the broadcast
> > set
> > > > has been completely transmitted. But here one has to make sure that
> the
> > > > sub-task receiving the BC set has been deployed and is not queued up
> > for
> > > > scheduling.
> > > >
> > > > So there are some challenges to solve in order to optimize the BC
> sets.
> > > > Currently, there is nobody working on it. If you want to start
> working
> > on
> > > > it, then I would recommend to open a JIRA and start writing a design
> > > > document for it.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <
> > > andreas.kunft@tu-berlin.de
> > > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > >
> > > > > we experience some unexpected increase of data sent over the
> network
> > > for
> > > > > broadcasts with increasing number of slots per Taskmanager.
> > > > >
> > > > >
> > > > > We provided a benchmark [1]. It not only increases the size of data
> > > sent
> > > > > over the network but also hurts performance as seen in the
> > preliminary
> > > > > results below. In this results cloud-11 has 25 nodes and ibm-power
> > has
> > > 8
> > > > > nodes with scaling the number of slots per node from 1 - 16.
> > > > >
> > > > >
> > > > > +-----------------------+--------------+-------------+
> > > > > | suite                 | name         | median_time |
> > > > > +=======================+==============+=============+
> > > > > | broadcast.cloud-11    | broadcast.01 |        8796 |
> > > > > | broadcast.cloud-11    | broadcast.02 |       14802 |
> > > > > | broadcast.cloud-11    | broadcast.04 |       30173 |
> > > > > | broadcast.cloud-11    | broadcast.08 |       56936 |
> > > > > | broadcast.cloud-11    | broadcast.16 |      117507 |
> > > > > | broadcast.ibm-power-1 | broadcast.01 |        6807 |
> > > > > | broadcast.ibm-power-1 | broadcast.02 |        8443 |
> > > > > | broadcast.ibm-power-1 | broadcast.04 |       11823 |
> > > > > | broadcast.ibm-power-1 | broadcast.08 |       21655 |
> > > > > | broadcast.ibm-power-1 | broadcast.16 |       37426 |
> > > > > +-----------------------+--------------+-------------+
> > > > >
> > > > >
> > > > >
> > > > > After looking into the code base it, it seems that the data is
> > > > > de-serialized only once per TM, but the actual data is sent for all
> > > slots
> > > > > running the operator with broadcast vars and just gets discarded in
> > > case
> > > > > its already de-serialized.
> > > > >
> > > > >
> > > > > I do not see a reason the data can't be shared among the slots of a
> > TM
> > > > and
> > > > > therefore just sent once, but I guess it would require quite some
> > > changes
> > > > > bc sets are handled currently.
> > > > >
> > > > >
> > > > > Are there any future plans regarding this and/or is there interest
> in
> > > > this
> > > > > "feature"?
> > > > >
> > > > >
> > > > > Best
> > > > >
> > > > > Andreas?
> > > > >
> > > > >
> > > > > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast?
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Broadcast data sent increases with # slots per TM

Posted by Stephan Ewen <se...@apache.org>.
Till is right. Broadcast joins currently materialize once per slot.
Originally, the purely push based runtime was not good enough to handle it
differently.

By now, we could definitely handle BC Vars differently (only one slot per
TM requests).
For BC Joins, the hash tables do not coordinate spilling currently, which
means that we cannot do multiple joins through the same hash table.


On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann <tr...@apache.org> wrote:

> If I'm not mistaken, then broadcast variables and broadcast inputs of joins
> follow different code paths. Broadcast variables use additional input
> channels and are read before the actual driver code runs. In contrast to
> that, a join operation is a two input operator where the join driver
> decides how to handle the inputs (which one to read first as build input).
>
> This also entails that the broadcast variable optimization, where each task
> manager holds the data only once and copies of the data are discarded (but
> they are transmitted multiple times to the TM), does not apply to the
> broadcast join inputs. Here you should see an slightly worse performance
> degradation with your initial benchmark if you increase the number of
> slots.
>
> Cheers,
> Till
>
> On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
> alexander.s.alexandrov@gmail.com> wrote:
>
> > > As far as I know, the reason why the broadcast variables are
> implemented
> > that way is that the senders would have to know which sub-tasks are
> > deployed to which TMs.
> >
> > As the broadcast variables are realized as additionally attached
> "broadcast
> > channels", I am assuming that the same behavior will apply for broadcast
> > joins as well.
> >
> > Is this the case?
> >
> > Regards,
> > Alexander
> >
> >
> > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas <an...@tu-berlin.de>:
> >
> > > Hi Till,
> > >
> > > thanks for the fast answer.
> > > I'll think about a concrete way of implementing and open an JIRA.
> > >
> > > Best
> > > Andreas
> > > ________________________________________
> > > Von: Till Rohrmann <tr...@apache.org>
> > > Gesendet: Mittwoch, 8. Juni 2016 15:53
> > > An: dev@flink.apache.org
> > > Betreff: Re: Broadcast data sent increases with # slots per TM
> > >
> > > Hi Andreas,
> > >
> > > your observation is correct. The data is sent to each slot and the
> > > receiving TM only materializes one copy of the data. The rest of the
> data
> > > is discarded.
> > >
> > > As far as I know, the reason why the broadcast variables are
> implemented
> > > that way is that the senders would have to know which sub-tasks are
> > > deployed to which TMs. Only then, you can decide for which sub-tasks
> you
> > > can send the data together. Since the output emitters are agnostic to
> the
> > > actual deployment, the necessary information would have to be forwarded
> > to
> > > them.
> > >
> > > Another problem is that if you pick one of the sub-tasks to receive the
> > > broadcast set, then you have to make sure, that this sub-task has read
> > and
> > > materialized the broadcast set before the other sub-tasks start
> working.
> > > One could maybe send to one sub-task first the broadcast set and then
> to
> > > all other sub-tasks, after one has sent the BC set, a kind of
> acknowledge
> > > record. That way, the other sub-tasks would block until the broadcast
> set
> > > has been completely transmitted. But here one has to make sure that the
> > > sub-task receiving the BC set has been deployed and is not queued up
> for
> > > scheduling.
> > >
> > > So there are some challenges to solve in order to optimize the BC sets.
> > > Currently, there is nobody working on it. If you want to start working
> on
> > > it, then I would recommend to open a JIRA and start writing a design
> > > document for it.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <
> > andreas.kunft@tu-berlin.de
> > > >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > > we experience some unexpected increase of data sent over the network
> > for
> > > > broadcasts with increasing number of slots per Taskmanager.
> > > >
> > > >
> > > > We provided a benchmark [1]. It not only increases the size of data
> > sent
> > > > over the network but also hurts performance as seen in the
> preliminary
> > > > results below. In this results cloud-11 has 25 nodes and ibm-power
> has
> > 8
> > > > nodes with scaling the number of slots per node from 1 - 16.
> > > >
> > > >
> > > > +-----------------------+--------------+-------------+
> > > > | suite                 | name         | median_time |
> > > > +=======================+==============+=============+
> > > > | broadcast.cloud-11    | broadcast.01 |        8796 |
> > > > | broadcast.cloud-11    | broadcast.02 |       14802 |
> > > > | broadcast.cloud-11    | broadcast.04 |       30173 |
> > > > | broadcast.cloud-11    | broadcast.08 |       56936 |
> > > > | broadcast.cloud-11    | broadcast.16 |      117507 |
> > > > | broadcast.ibm-power-1 | broadcast.01 |        6807 |
> > > > | broadcast.ibm-power-1 | broadcast.02 |        8443 |
> > > > | broadcast.ibm-power-1 | broadcast.04 |       11823 |
> > > > | broadcast.ibm-power-1 | broadcast.08 |       21655 |
> > > > | broadcast.ibm-power-1 | broadcast.16 |       37426 |
> > > > +-----------------------+--------------+-------------+
> > > >
> > > >
> > > >
> > > > After looking into the code base it, it seems that the data is
> > > > de-serialized only once per TM, but the actual data is sent for all
> > slots
> > > > running the operator with broadcast vars and just gets discarded in
> > case
> > > > its already de-serialized.
> > > >
> > > >
> > > > I do not see a reason the data can't be shared among the slots of a
> TM
> > > and
> > > > therefore just sent once, but I guess it would require quite some
> > changes
> > > > bc sets are handled currently.
> > > >
> > > >
> > > > Are there any future plans regarding this and/or is there interest in
> > > this
> > > > "feature"?
> > > >
> > > >
> > > > Best
> > > >
> > > > Andreas?
> > > >
> > > >
> > > > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast?
> > > >
> > > >
> > > >
> > >
> >
>

Re: Broadcast data sent increases with # slots per TM

Posted by Till Rohrmann <tr...@apache.org>.
If I'm not mistaken, then broadcast variables and broadcast inputs of joins
follow different code paths. Broadcast variables use additional input
channels and are read before the actual driver code runs. In contrast to
that, a join operation is a two input operator where the join driver
decides how to handle the inputs (which one to read first as build input).

This also entails that the broadcast variable optimization, where each task
manager holds the data only once and copies of the data are discarded (but
they are transmitted multiple times to the TM), does not apply to the
broadcast join inputs. Here you should see an slightly worse performance
degradation with your initial benchmark if you increase the number of slots.

Cheers,
Till

On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
alexander.s.alexandrov@gmail.com> wrote:

> > As far as I know, the reason why the broadcast variables are implemented
> that way is that the senders would have to know which sub-tasks are
> deployed to which TMs.
>
> As the broadcast variables are realized as additionally attached "broadcast
> channels", I am assuming that the same behavior will apply for broadcast
> joins as well.
>
> Is this the case?
>
> Regards,
> Alexander
>
>
> 2016-06-08 17:13 GMT+02:00 Kunft, Andreas <an...@tu-berlin.de>:
>
> > Hi Till,
> >
> > thanks for the fast answer.
> > I'll think about a concrete way of implementing and open an JIRA.
> >
> > Best
> > Andreas
> > ________________________________________
> > Von: Till Rohrmann <tr...@apache.org>
> > Gesendet: Mittwoch, 8. Juni 2016 15:53
> > An: dev@flink.apache.org
> > Betreff: Re: Broadcast data sent increases with # slots per TM
> >
> > Hi Andreas,
> >
> > your observation is correct. The data is sent to each slot and the
> > receiving TM only materializes one copy of the data. The rest of the data
> > is discarded.
> >
> > As far as I know, the reason why the broadcast variables are implemented
> > that way is that the senders would have to know which sub-tasks are
> > deployed to which TMs. Only then, you can decide for which sub-tasks you
> > can send the data together. Since the output emitters are agnostic to the
> > actual deployment, the necessary information would have to be forwarded
> to
> > them.
> >
> > Another problem is that if you pick one of the sub-tasks to receive the
> > broadcast set, then you have to make sure, that this sub-task has read
> and
> > materialized the broadcast set before the other sub-tasks start working.
> > One could maybe send to one sub-task first the broadcast set and then to
> > all other sub-tasks, after one has sent the BC set, a kind of acknowledge
> > record. That way, the other sub-tasks would block until the broadcast set
> > has been completely transmitted. But here one has to make sure that the
> > sub-task receiving the BC set has been deployed and is not queued up for
> > scheduling.
> >
> > So there are some challenges to solve in order to optimize the BC sets.
> > Currently, there is nobody working on it. If you want to start working on
> > it, then I would recommend to open a JIRA and start writing a design
> > document for it.
> >
> > Cheers,
> > Till
> >
> > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <
> andreas.kunft@tu-berlin.de
> > >
> > wrote:
> >
> > > Hi,
> > >
> > >
> > > we experience some unexpected increase of data sent over the network
> for
> > > broadcasts with increasing number of slots per Taskmanager.
> > >
> > >
> > > We provided a benchmark [1]. It not only increases the size of data
> sent
> > > over the network but also hurts performance as seen in the preliminary
> > > results below. In this results cloud-11 has 25 nodes and ibm-power has
> 8
> > > nodes with scaling the number of slots per node from 1 - 16.
> > >
> > >
> > > +-----------------------+--------------+-------------+
> > > | suite                 | name         | median_time |
> > > +=======================+==============+=============+
> > > | broadcast.cloud-11    | broadcast.01 |        8796 |
> > > | broadcast.cloud-11    | broadcast.02 |       14802 |
> > > | broadcast.cloud-11    | broadcast.04 |       30173 |
> > > | broadcast.cloud-11    | broadcast.08 |       56936 |
> > > | broadcast.cloud-11    | broadcast.16 |      117507 |
> > > | broadcast.ibm-power-1 | broadcast.01 |        6807 |
> > > | broadcast.ibm-power-1 | broadcast.02 |        8443 |
> > > | broadcast.ibm-power-1 | broadcast.04 |       11823 |
> > > | broadcast.ibm-power-1 | broadcast.08 |       21655 |
> > > | broadcast.ibm-power-1 | broadcast.16 |       37426 |
> > > +-----------------------+--------------+-------------+
> > >
> > >
> > >
> > > After looking into the code base it, it seems that the data is
> > > de-serialized only once per TM, but the actual data is sent for all
> slots
> > > running the operator with broadcast vars and just gets discarded in
> case
> > > its already de-serialized.
> > >
> > >
> > > I do not see a reason the data can't be shared among the slots of a TM
> > and
> > > therefore just sent once, but I guess it would require quite some
> changes
> > > bc sets are handled currently.
> > >
> > >
> > > Are there any future plans regarding this and/or is there interest in
> > this
> > > "feature"?
> > >
> > >
> > > Best
> > >
> > > Andreas?
> > >
> > >
> > > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast?
> > >
> > >
> > >
> >
>

Re: Broadcast data sent increases with # slots per TM

Posted by Alexander Alexandrov <al...@gmail.com>.
> As far as I know, the reason why the broadcast variables are implemented
that way is that the senders would have to know which sub-tasks are
deployed to which TMs.

As the broadcast variables are realized as additionally attached "broadcast
channels", I am assuming that the same behavior will apply for broadcast
joins as well.

Is this the case?

Regards,
Alexander


2016-06-08 17:13 GMT+02:00 Kunft, Andreas <an...@tu-berlin.de>:

> Hi Till,
>
> thanks for the fast answer.
> I'll think about a concrete way of implementing and open an JIRA.
>
> Best
> Andreas
> ________________________________________
> Von: Till Rohrmann <tr...@apache.org>
> Gesendet: Mittwoch, 8. Juni 2016 15:53
> An: dev@flink.apache.org
> Betreff: Re: Broadcast data sent increases with # slots per TM
>
> Hi Andreas,
>
> your observation is correct. The data is sent to each slot and the
> receiving TM only materializes one copy of the data. The rest of the data
> is discarded.
>
> As far as I know, the reason why the broadcast variables are implemented
> that way is that the senders would have to know which sub-tasks are
> deployed to which TMs. Only then, you can decide for which sub-tasks you
> can send the data together. Since the output emitters are agnostic to the
> actual deployment, the necessary information would have to be forwarded to
> them.
>
> Another problem is that if you pick one of the sub-tasks to receive the
> broadcast set, then you have to make sure, that this sub-task has read and
> materialized the broadcast set before the other sub-tasks start working.
> One could maybe send to one sub-task first the broadcast set and then to
> all other sub-tasks, after one has sent the BC set, a kind of acknowledge
> record. That way, the other sub-tasks would block until the broadcast set
> has been completely transmitted. But here one has to make sure that the
> sub-task receiving the BC set has been deployed and is not queued up for
> scheduling.
>
> So there are some challenges to solve in order to optimize the BC sets.
> Currently, there is nobody working on it. If you want to start working on
> it, then I would recommend to open a JIRA and start writing a design
> document for it.
>
> Cheers,
> Till
>
> On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <andreas.kunft@tu-berlin.de
> >
> wrote:
>
> > Hi,
> >
> >
> > we experience some unexpected increase of data sent over the network for
> > broadcasts with increasing number of slots per Taskmanager.
> >
> >
> > We provided a benchmark [1]. It not only increases the size of data sent
> > over the network but also hurts performance as seen in the preliminary
> > results below. In this results cloud-11 has 25 nodes and ibm-power has 8
> > nodes with scaling the number of slots per node from 1 - 16.
> >
> >
> > +-----------------------+--------------+-------------+
> > | suite                 | name         | median_time |
> > +=======================+==============+=============+
> > | broadcast.cloud-11    | broadcast.01 |        8796 |
> > | broadcast.cloud-11    | broadcast.02 |       14802 |
> > | broadcast.cloud-11    | broadcast.04 |       30173 |
> > | broadcast.cloud-11    | broadcast.08 |       56936 |
> > | broadcast.cloud-11    | broadcast.16 |      117507 |
> > | broadcast.ibm-power-1 | broadcast.01 |        6807 |
> > | broadcast.ibm-power-1 | broadcast.02 |        8443 |
> > | broadcast.ibm-power-1 | broadcast.04 |       11823 |
> > | broadcast.ibm-power-1 | broadcast.08 |       21655 |
> > | broadcast.ibm-power-1 | broadcast.16 |       37426 |
> > +-----------------------+--------------+-------------+
> >
> >
> >
> > After looking into the code base it, it seems that the data is
> > de-serialized only once per TM, but the actual data is sent for all slots
> > running the operator with broadcast vars and just gets discarded in case
> > its already de-serialized.
> >
> >
> > I do not see a reason the data can't be shared among the slots of a TM
> and
> > therefore just sent once, but I guess it would require quite some changes
> > bc sets are handled currently.
> >
> >
> > Are there any future plans regarding this and/or is there interest in
> this
> > "feature"?
> >
> >
> > Best
> >
> > Andreas?
> >
> >
> > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast?
> >
> >
> >
>

AW: Broadcast data sent increases with # slots per TM

Posted by "Kunft, Andreas" <an...@tu-berlin.de>.
Hi Till,

thanks for the fast answer.
I'll think about a concrete way of implementing and open an JIRA.

Best
Andreas    
________________________________________
Von: Till Rohrmann <tr...@apache.org>
Gesendet: Mittwoch, 8. Juni 2016 15:53
An: dev@flink.apache.org
Betreff: Re: Broadcast data sent increases with # slots per TM

Hi Andreas,

your observation is correct. The data is sent to each slot and the
receiving TM only materializes one copy of the data. The rest of the data
is discarded.

As far as I know, the reason why the broadcast variables are implemented
that way is that the senders would have to know which sub-tasks are
deployed to which TMs. Only then, you can decide for which sub-tasks you
can send the data together. Since the output emitters are agnostic to the
actual deployment, the necessary information would have to be forwarded to
them.

Another problem is that if you pick one of the sub-tasks to receive the
broadcast set, then you have to make sure, that this sub-task has read and
materialized the broadcast set before the other sub-tasks start working.
One could maybe send to one sub-task first the broadcast set and then to
all other sub-tasks, after one has sent the BC set, a kind of acknowledge
record. That way, the other sub-tasks would block until the broadcast set
has been completely transmitted. But here one has to make sure that the
sub-task receiving the BC set has been deployed and is not queued up for
scheduling.

So there are some challenges to solve in order to optimize the BC sets.
Currently, there is nobody working on it. If you want to start working on
it, then I would recommend to open a JIRA and start writing a design
document for it.

Cheers,
Till

On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <an...@tu-berlin.de>
wrote:

> Hi,
>
>
> we experience some unexpected increase of data sent over the network for
> broadcasts with increasing number of slots per Taskmanager.
>
>
> We provided a benchmark [1]. It not only increases the size of data sent
> over the network but also hurts performance as seen in the preliminary
> results below. In this results cloud-11 has 25 nodes and ibm-power has 8
> nodes with scaling the number of slots per node from 1 - 16.
>
>
> +-----------------------+--------------+-------------+
> | suite                 | name         | median_time |
> +=======================+==============+=============+
> | broadcast.cloud-11    | broadcast.01 |        8796 |
> | broadcast.cloud-11    | broadcast.02 |       14802 |
> | broadcast.cloud-11    | broadcast.04 |       30173 |
> | broadcast.cloud-11    | broadcast.08 |       56936 |
> | broadcast.cloud-11    | broadcast.16 |      117507 |
> | broadcast.ibm-power-1 | broadcast.01 |        6807 |
> | broadcast.ibm-power-1 | broadcast.02 |        8443 |
> | broadcast.ibm-power-1 | broadcast.04 |       11823 |
> | broadcast.ibm-power-1 | broadcast.08 |       21655 |
> | broadcast.ibm-power-1 | broadcast.16 |       37426 |
> +-----------------------+--------------+-------------+
>
>
>
> After looking into the code base it, it seems that the data is
> de-serialized only once per TM, but the actual data is sent for all slots
> running the operator with broadcast vars and just gets discarded in case
> its already de-serialized.
>
>
> I do not see a reason the data can't be shared among the slots of a TM and
> therefore just sent once, but I guess it would require quite some changes
> bc sets are handled currently.
>
>
> Are there any future plans regarding this and/or is there interest in this
> "feature"?
>
>
> Best
>
> Andreas?
>
>
> [1] https://github.com/TU-Berlin-DIMA/flink-broadcast?
>
>
>

Re: Broadcast data sent increases with # slots per TM

Posted by Till Rohrmann <tr...@apache.org>.
Hi Andreas,

your observation is correct. The data is sent to each slot and the
receiving TM only materializes one copy of the data. The rest of the data
is discarded.

As far as I know, the reason why the broadcast variables are implemented
that way is that the senders would have to know which sub-tasks are
deployed to which TMs. Only then, you can decide for which sub-tasks you
can send the data together. Since the output emitters are agnostic to the
actual deployment, the necessary information would have to be forwarded to
them.

Another problem is that if you pick one of the sub-tasks to receive the
broadcast set, then you have to make sure, that this sub-task has read and
materialized the broadcast set before the other sub-tasks start working.
One could maybe send to one sub-task first the broadcast set and then to
all other sub-tasks, after one has sent the BC set, a kind of acknowledge
record. That way, the other sub-tasks would block until the broadcast set
has been completely transmitted. But here one has to make sure that the
sub-task receiving the BC set has been deployed and is not queued up for
scheduling.

So there are some challenges to solve in order to optimize the BC sets.
Currently, there is nobody working on it. If you want to start working on
it, then I would recommend to open a JIRA and start writing a design
document for it.

Cheers,
Till

On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <an...@tu-berlin.de>
wrote:

> Hi,
>
>
> we experience some unexpected increase of data sent over the network for
> broadcasts with increasing number of slots per Taskmanager.
>
>
> We provided a benchmark [1]. It not only increases the size of data sent
> over the network but also hurts performance as seen in the preliminary
> results below. In this results cloud-11 has 25 nodes and ibm-power has 8
> nodes with scaling the number of slots per node from 1 - 16.
>
>
> +-----------------------+--------------+-------------+
> | suite                 | name         | median_time |
> +=======================+==============+=============+
> | broadcast.cloud-11    | broadcast.01 |        8796 |
> | broadcast.cloud-11    | broadcast.02 |       14802 |
> | broadcast.cloud-11    | broadcast.04 |       30173 |
> | broadcast.cloud-11    | broadcast.08 |       56936 |
> | broadcast.cloud-11    | broadcast.16 |      117507 |
> | broadcast.ibm-power-1 | broadcast.01 |        6807 |
> | broadcast.ibm-power-1 | broadcast.02 |        8443 |
> | broadcast.ibm-power-1 | broadcast.04 |       11823 |
> | broadcast.ibm-power-1 | broadcast.08 |       21655 |
> | broadcast.ibm-power-1 | broadcast.16 |       37426 |
> +-----------------------+--------------+-------------+
>
>
>
> After looking into the code base it, it seems that the data is
> de-serialized only once per TM, but the actual data is sent for all slots
> running the operator with broadcast vars and just gets discarded in case
> its already de-serialized.
>
>
> I do not see a reason the data can't be shared among the slots of a TM and
> therefore just sent once, but I guess it would require quite some changes
> bc sets are handled currently.
>
>
> Are there any future plans regarding this and/or is there interest in this
> "feature"?
>
>
> Best
>
> Andreas?
>
>
> [1] https://github.com/TU-Berlin-DIMA/flink-broadcast?
>
>
>