You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Pawel Bartoszek <pa...@gmail.com> on 2018/01/18 15:02:33 UTC

Scheduling of GroupByKey and CombinePerKey operations

Can I ask why some operations run only one slot? I understand that file
writes should happen only one one slot but GroupByKey operation could be
distributed across all slots. I am having around 20k distinct keys every
minute. Is there any way to break this operator chain?

I noticed that CombinePerKey operations that don't have IO related
transformation are scheduled across all 32 slots.


My cluster has 32 slots across 2 task managers. Running Beam 2.2. and Flink
1.3.2

2018-01-18, 13:56:28 2018-01-18, 14:37:14 40m 45s GroupByKey ->
ParMultiDo(WriteShardedBundles) -> ParMultiDo(Anonymous) ->
xxx.pipeline.output.io.file.WriteWindowToFile-SumPlaybackBitrateResult2/TextIO.Write/WriteFiles/Reshuffle/Window.Into()/Window.Assign.out
-> ParMultiDo(ReifyValueTimestamp) -> ToKeyedWorkItem 149 MB 333,672 70.8 MB
19 32
00320000
RUNNING

Start TimeEnd TimeDurationBytes receivedRecords receivedBytes sentRecords
sentAttemptHostStatus
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 77.5 MB 333,683 2.21 MB 20 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING

Thanks,
Pawel

Re: Scheduling of GroupByKey and CombinePerKey operations

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

What are the other stages in that program?

Best,
Aljoscha

> On 18. Jan 2018, at 16:22, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Pawel,
> 
> This question might be better suited for the Beam user list.
> Beam includes the Beam Flink runner which translates Beam programs into Flink programs.
> 
> Best, 
> Fabian
> 
> 2018-01-18 16:02 GMT+01:00 Pawel Bartoszek <pawelbartoszek89@gmail.com <ma...@gmail.com>>:
> Can I ask why some operations run only one slot? I understand that file writes should happen only one one slot but GroupByKey operation could be distributed across all slots. I am having around 20k distinct keys every minute. Is there any way to break this operator chain? 
>  
> I noticed that CombinePerKey operations that don't have IO related transformation are scheduled across all 32 slots.
> 
> 
> My cluster has 32 slots across 2 task managers. Running Beam 2.2. and Flink 1.3.2
> 
> 2018-01-18, 13:56:28	2018-01-18, 14:37:14	40m 45s	GroupByKey -> ParMultiDo(WriteShardedBundles) -> ParMultiDo(Anonymous) -> xxx.pipeline.output.io.file.WriteWindowToFile-SumPlaybackBitrateResult2/TextIO.Write/WriteFiles/Reshuffle/Window.Into()/Window.Assign.out -> ParMultiDo(ReifyValueTimestamp) -> ToKeyedWorkItem	149 MB	333,672	70.8 MB	19	32	
> 00320000
> RUNNING
> Start Time	End Time	Duration	Bytes received	Records received	Bytes sent	Records sent	Attempt	Host	Status
> 2018-01-18, 13:56:28		40m 45s	2.30 MB	0	2.21 MB	0	1	xxx	RUNNING
> 2018-01-18, 13:56:28		40m 45s	2.30 MB	0	2.21 MB	0	1	xxx	RUNNING
> 2018-01-18, 13:56:28		40m 45s	2.30 MB	0	2.21 MB	0	1	xxx	RUNNING
> 2018-01-18, 13:56:28		40m 45s	77.5 MB	333,683	2.21 MB	20	1	xxx	RUNNING
> 2018-01-18, 13:56:28		40m 45s	2.30 MB	0	2.21 MB	0	1	xxx	RUNNING
> 2018-01-18, 13:56:28		40m 45s	2.30 MB	0	2.21 MB	0	1	xxx	RUNNING
> 
> Thanks,
> Pawel
> 


Re: Scheduling of GroupByKey and CombinePerKey operations

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Pawel,

This question might be better suited for the Beam user list.
Beam includes the Beam Flink runner which translates Beam programs into
Flink programs.

Best,
Fabian

2018-01-18 16:02 GMT+01:00 Pawel Bartoszek <pa...@gmail.com>:

> Can I ask why some operations run only one slot? I understand that file
> writes should happen only one one slot but GroupByKey operation could be
> distributed across all slots. I am having around 20k distinct keys every
> minute. Is there any way to break this operator chain?
>
> I noticed that CombinePerKey operations that don't have IO related
> transformation are scheduled across all 32 slots.
>
>
> My cluster has 32 slots across 2 task managers. Running Beam 2.2. and
> Flink 1.3.2
>
> 2018-01-18, 13:56:28 2018-01-18, 14:37:14 40m 45s GroupByKey ->
> ParMultiDo(WriteShardedBundles) -> ParMultiDo(Anonymous) ->
> xxx.pipeline.output.io.file.WriteWindowToFile-SumPlaybackBitrateResult2/
> TextIO.Write/WriteFiles/Reshuffle/Window.Into()/Window.Assign.out ->
> ParMultiDo(ReifyValueTimestamp) -> ToKeyedWorkItem 149 MB 333,672 70.8 MB
> 19 32
> 00320000
> RUNNING
>
> Start TimeEnd TimeDurationBytes receivedRecords receivedBytes sentRecords
> sentAttemptHostStatus
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 77.5 MB 333,683 2.21 MB 20 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
>
> Thanks,
> Pawel
>