You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by M Singh <ma...@yahoo.com> on 2017/12/25 18:12:44 UTC

Apache Flink - broadcasting DataStream

Hi:
I am trying to understand broadcast functionality for DataStream and the documentation indicates that it 'Broadcasts elements to every partition.'
My question is that since streams are unbounded:
1 What elements get broad to the partitions ?2. What happens as new elements are added to the stream ? Are only the new elements broadcast ?3. Since the broadcast operation returns a DataStream can it be used in join how do new (and old) elements affect the join results ?  4. Similarly how does broadcast work with connected streams ?
If there are some examples please let me know.
Thanks
Mans

Re: Apache Flink - broadcasting DataStream

Posted by M Singh <ma...@yahoo.com>.
Thanks Fabian and Ufuk for your answers. 

    On Thursday, January 4, 2018 12:32 AM, Fabian Hueske <fh...@gmail.com> wrote:
 

 Hi,

A broadcast replicates all elements by the parallelism of the following operator, i.e., each parallel instance of the following operator receives all events of its input stream.If the operation following a broadcast is a connect and a co-operator (and the other input is not broadcasted), the operator can be used for arbitrary joins (not only equi-joins).Depending on the join semantics, the joining operator needs to put events from one (broadcasted or non-broadcasted) or both inputs as state to be able to perform the join.
Best, Fabian


2017-12-30 23:32 GMT+01:00 M Singh <ma...@yahoo.com>:

Hi Ufuk:
Thanks for your explanation.
I can understand broadcasting a small immutable dataset to the subtasks so that they can be joined with a stream.  
However I am still trying to understand how will each broadcasted element from a stream be used in join operation with another stream.  Is this just on optimization over joining two streams ?  
Also, I believe that substasks are operating on partitions of a stream and only equi-joins are possible for streams.  So what is the reason we would like to broadcast each element to all the substasks ?
Thanks again. 

    On Wednesday, December 27, 2017 12:52 AM, Ufuk Celebi <uc...@apache.org> wrote:
 

 Hey Mans!

This refers to how sub tasks are connected to each other in your
program. If you have a single sub task A1 and three sub tasks B1, B2,
B3, broadcast will emit each incoming record at A1 to all B1, B2, B3:

A1 --+-> B1
    +-> B2
    +-> B3

Does this help?

On Mon, Dec 25, 2017 at 7:12 PM, M Singh <ma...@yahoo.com> wrote:
> 1 What elements get broad to the partitions ?

Each incoming element is broadcasted

> 2. What happens as new elements are added to the stream ? Are only the new
> elements broadcast ?

Yes, each incoming element is broadcasted separately without any history.

> 3. Since the broadcast operation returns a DataStream can it be used in join
> how do new (and old) elements affect the join results ?

Yes, should be. Every element is broadcasted only once.

> 4. Similarly how does broadcast work with connected streams ?

Similar to non connected streams. The incoming records are emitted to
every downstream partition.

– Ufuk

   



   

Re: Apache Flink - broadcasting DataStream

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

A broadcast replicates all elements by the parallelism of the following
operator, i.e., each parallel instance of the following operator receives
all events of its input stream.
If the operation following a broadcast is a connect and a co-operator (and
the other input is not broadcasted), the operator can be used for arbitrary
joins (not only equi-joins).
Depending on the join semantics, the joining operator needs to put events
from one (broadcasted or non-broadcasted) or both inputs as state to be
able to perform the join.

Best, Fabian


2017-12-30 23:32 GMT+01:00 M Singh <ma...@yahoo.com>:

> Hi Ufuk:
>
> Thanks for your explanation.
>
> I can understand broadcasting a small immutable dataset to the subtasks so
> that they can be joined with a stream.
>
> However I am still trying to understand how will each broadcasted element
> from a stream be used in join operation with another stream.  Is this just
> on optimization over joining two streams ?
>
> Also, I believe that substasks are operating on partitions of a stream and
> only equi-joins are possible for streams.  So what is the reason we would
> like to broadcast each element to all the substasks ?
>
> Thanks again.
>
>
> On Wednesday, December 27, 2017 12:52 AM, Ufuk Celebi <uc...@apache.org>
> wrote:
>
>
> Hey Mans!
>
> This refers to how sub tasks are connected to each other in your
> program. If you have a single sub task A1 and three sub tasks B1, B2,
> B3, broadcast will emit each incoming record at A1 to all B1, B2, B3:
>
> A1 --+-> B1
>     +-> B2
>     +-> B3
>
> Does this help?
>
> On Mon, Dec 25, 2017 at 7:12 PM, M Singh <ma...@yahoo.com> wrote:
> > 1 What elements get broad to the partitions ?
>
> Each incoming element is broadcasted
>
> > 2. What happens as new elements are added to the stream ? Are only the
> new
> > elements broadcast ?
>
> Yes, each incoming element is broadcasted separately without any history.
>
> > 3. Since the broadcast operation returns a DataStream can it be used in
> join
> > how do new (and old) elements affect the join results ?
>
> Yes, should be. Every element is broadcasted only once.
>
>
> > 4. Similarly how does broadcast work with connected streams ?
>
>
> Similar to non connected streams. The incoming records are emitted to
> every downstream partition.
>
> – Ufuk
>
>
>

Re: Apache Flink - broadcasting DataStream

Posted by M Singh <ma...@yahoo.com>.
Hi Ufuk:
Thanks for your explanation.
I can understand broadcasting a small immutable dataset to the subtasks so that they can be joined with a stream.  
However I am still trying to understand how will each broadcasted element from a stream be used in join operation with another stream.  Is this just on optimization over joining two streams ?  
Also, I believe that substasks are operating on partitions of a stream and only equi-joins are possible for streams.  So what is the reason we would like to broadcast each element to all the substasks ?
Thanks again. 

    On Wednesday, December 27, 2017 12:52 AM, Ufuk Celebi <uc...@apache.org> wrote:
 

 Hey Mans!

This refers to how sub tasks are connected to each other in your
program. If you have a single sub task A1 and three sub tasks B1, B2,
B3, broadcast will emit each incoming record at A1 to all B1, B2, B3:

A1 --+-> B1
    +-> B2
    +-> B3

Does this help?

On Mon, Dec 25, 2017 at 7:12 PM, M Singh <ma...@yahoo.com> wrote:
> 1 What elements get broad to the partitions ?

Each incoming element is broadcasted

> 2. What happens as new elements are added to the stream ? Are only the new
> elements broadcast ?

Yes, each incoming element is broadcasted separately without any history.

> 3. Since the broadcast operation returns a DataStream can it be used in join
> how do new (and old) elements affect the join results ?

Yes, should be. Every element is broadcasted only once.

> 4. Similarly how does broadcast work with connected streams ?

Similar to non connected streams. The incoming records are emitted to
every downstream partition.

– Ufuk

   

Re: Apache Flink - broadcasting DataStream

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Mans!

This refers to how sub tasks are connected to each other in your
program. If you have a single sub task A1 and three sub tasks B1, B2,
B3, broadcast will emit each incoming record at A1 to all B1, B2, B3:

A1 --+-> B1
     +-> B2
     +-> B3

Does this help?

On Mon, Dec 25, 2017 at 7:12 PM, M Singh <ma...@yahoo.com> wrote:
> 1 What elements get broad to the partitions ?

Each incoming element is broadcasted

> 2. What happens as new elements are added to the stream ? Are only the new
> elements broadcast ?

Yes, each incoming element is broadcasted separately without any history.

> 3. Since the broadcast operation returns a DataStream can it be used in join
> how do new (and old) elements affect the join results ?

Yes, should be. Every element is broadcasted only once.

> 4. Similarly how does broadcast work with connected streams ?

Similar to non connected streams. The incoming records are emitted to
every downstream partition.

– Ufuk