You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by M Singh <ma...@yahoo.com> on 2017/12/31 21:38:44 UTC

Apache Flink - Connected Stream with different number of partitions

Hi:
Referring to documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html) for ConnectedStreams:
"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
If the two connected streams have different number of partitions, eg (someStream has 4 and otherStream has 2), then how do the elements of the stream get distributed for the CoMapFunction:
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
I believe that that if the second stream is broadcast, then each partition of the first will get all the elements of the second.  Is my understanding correct ?
If the streams are not broadcast and since the first stream has 4 partitions and second one had 2, then how are the elements of the second stream distributed to each partition of the first ?
Also, if the streams are not broadcasted but have same number of partitions, how are the elements distributed ?
Thanks
Mans




Re: Apache Flink - Connected Stream with different number of partitions

Posted by M Singh <ma...@yahoo.com>.
Thanks Aljoscha and Timo for your answers.  I will try to digest the pointers you provided.
Mans 

    On Wednesday, January 3, 2018 3:01 AM, Aljoscha Krettek <al...@apache.org> wrote:
 

 Hi,
The answer is correct but I'll try and elaborate a bit: the way data is sent to downstream operations depends on a couple of things in this case:
 - parallelism of first input operation - parallelism of second input operation - parallelism of co-operation - transmission pattern on first input (broadcast, rebalance, etc.) - transmission pattern on second input
Note that there is no parallelism on "streams" since there are technically no streams but only operations that are interconnected in a certain way.
Now, if the input parallelism and the operation parallelism are the same and you don't specify a transmission pattern then data will not be "shuffled" between the operations. If you specify broadcast or rebalance then you will get that, i.e. for broadcast an element from the input operator will be sent to every instance on the downstream operation.
Best,Aljoscha

On 3. Jan 2018, at 10:43, Timo Walther <tw...@apache.org> wrote:
 
 Hi Mans,
 
 I did a quick test on my PC where I simply set breakpoints in map1 and map2 (someStream has parallelism 1, otherStream 5, my CoMapFunction 8). Elements of someStream end up in different CoMapTasks (2/8, 7/8 etc.).
 
 So I guess the distribution is a round robin partioning. @Aljoscha might know more about the internals?
 
 Regards,
 Timo
 
 
 
 Am 12/31/17 um 10:38 PM schrieb M Singh:
  
  Hi: 
  Referring to documentation(https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html) for ConnectedStreams: 
  "Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.  DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);  
  If the two connected streams have different number of partitions, eg (someStream has 4 and otherStream has 2), then how do the elements of the stream get distributed for the CoMapFunction: 
   connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});  
  I believe that that if the second stream is broadcast, then each partition of the first will get all the elements of the second.  Is my understanding correct ? 
  If the streams are not broadcast and since the first stream has 4 partitions and second one had 2, then how are the elements of the second stream distributed to each partition of the first ? 
  Also, if the streams are not broadcasted but have same number of partitions, how are the elements distributed ? 
  Thanks 
  Mans 
  
  
  
   

  


   

Re: Apache Flink - Connected Stream with different number of partitions

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

The answer is correct but I'll try and elaborate a bit: the way data is sent to downstream operations depends on a couple of things in this case:

 - parallelism of first input operation
 - parallelism of second input operation
 - parallelism of co-operation
 - transmission pattern on first input (broadcast, rebalance, etc.)
 - transmission pattern on second input

Note that there is no parallelism on "streams" since there are technically no streams but only operations that are interconnected in a certain way.

Now, if the input parallelism and the operation parallelism are the same and you don't specify a transmission pattern then data will not be "shuffled" between the operations. If you specify broadcast or rebalance then you will get that, i.e. for broadcast an element from the input operator will be sent to every instance on the downstream operation.

Best,
Aljoscha

> On 3. Jan 2018, at 10:43, Timo Walther <tw...@apache.org> wrote:
> 
> Hi Mans,
> 
> I did a quick test on my PC where I simply set breakpoints in map1 and map2 (someStream has parallelism 1, otherStream 5, my CoMapFunction 8). Elements of someStream end up in different CoMapTasks (2/8, 7/8 etc.).
> 
> So I guess the distribution is a round robin partioning. @Aljoscha might know more about the internals?
> 
> Regards,
> Timo
> 
> 
> 
> Am 12/31/17 um 10:38 PM schrieb M Singh:
>> Hi:
>> 
>> Referring to documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html>) for ConnectedStreams:
>> 
>> "Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.
>> DataStream<Integer> someStream = //...
>> DataStream<String> otherStream = //...
>> 
>> ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
>> 
>> If the two connected streams have different number of partitions, eg (someStream has 4 and otherStream has 2), then how do the elements of the stream get distributed for the CoMapFunction:
>> 
>> connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
>>     @Override
>>     public Boolean map1(Integer value) {
>>         return true;
>>     }
>> 
>>     @Override
>>     public Boolean map2(String value) {
>>         return false;
>>     }
>> });
>> 
>> I believe that that if the second stream is broadcast, then each partition of the first will get all the elements of the second.  Is my understanding correct ?
>> 
>> If the streams are not broadcast and since the first stream has 4 partitions and second one had 2, then how are the elements of the second stream distributed to each partition of the first ?
>> 
>> Also, if the streams are not broadcasted but have same number of partitions, how are the elements distributed ?
>> 
>> Thanks
>> 
>> Mans
>> 
>> 
>> 
>> 
> 


Re: Apache Flink - Connected Stream with different number of partitions

Posted by Timo Walther <tw...@apache.org>.
Hi Mans,

I did a quick test on my PC where I simply set breakpoints in map1 and 
map2 (someStream has parallelism 1, otherStream 5, my CoMapFunction 8). 
Elements of someStream end up in different CoMapTasks (2/8, 7/8 etc.).

So I guess the distribution is a round robin partioning. @Aljoscha might 
know more about the internals?

Regards,
Timo



Am 12/31/17 um 10:38 PM schrieb M Singh:
> Hi:
>
> Referring to documentation 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html) 
> for ConnectedStreams:
>
> "Connects" two data streams retaining their types. Connect allowing 
> for shared state between the two streams.
> |DataStream<Integer> someStream = //... DataStream<String> otherStream 
> = //... ConnectedStreams<Integer, String> connectedStreams = 
> someStream.connect(otherStream);|
>
> If the two connected streams have different number of partitions, eg 
> (someStream has 4 and otherStream has 2), then how do the elements of 
> the stream get distributed for the CoMapFunction:
>
> |connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { 
> @Override public Boolean map1(Integer value) { return true; } 
> @Override public Boolean map2(String value) { return false; } });|
>
> I believe that that if the second stream is broadcast, then each 
> partition of the first will get all the elements of the second.  Is my 
> understanding correct ?
>
> If the streams are not broadcast and since the first stream has 4 
> partitions and second one had 2, then how are the elements of the 
> second stream distributed to each partition of the first ?
>
> Also, if the streams are not broadcasted but have same number of 
> partitions, how are the elements distributed ?
>
> Thanks
>
> Mans
>
>
>
>