You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Pawel Bartoszek <pa...@gmail.com> on 2018/01/18 15:02:33 UTC
Scheduling of GroupByKey and CombinePerKey operations
Can I ask why some operations run only one slot? I understand that file
writes should happen only one one slot but GroupByKey operation could be
distributed across all slots. I am having around 20k distinct keys every
minute. Is there any way to break this operator chain?
I noticed that CombinePerKey operations that don't have IO related
transformation are scheduled across all 32 slots.
My cluster has 32 slots across 2 task managers. Running Beam 2.2. and Flink
1.3.2
2018-01-18, 13:56:28 2018-01-18, 14:37:14 40m 45s GroupByKey ->
ParMultiDo(WriteShardedBundles) -> ParMultiDo(Anonymous) ->
xxx.pipeline.output.io.file.WriteWindowToFile-SumPlaybackBitrateResult2/TextIO.Write/WriteFiles/Reshuffle/Window.Into()/Window.Assign.out
-> ParMultiDo(ReifyValueTimestamp) -> ToKeyedWorkItem 149 MB 333,672 70.8 MB
19 32
00320000
RUNNING
Start TimeEnd TimeDurationBytes receivedRecords receivedBytes sentRecords
sentAttemptHostStatus
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 77.5 MB 333,683 2.21 MB 20 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
Thanks,
Pawel
Re: Scheduling of GroupByKey and CombinePerKey operations
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
What are the other stages in that program?
Best,
Aljoscha
> On 18. Jan 2018, at 16:22, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Pawel,
>
> This question might be better suited for the Beam user list.
> Beam includes the Beam Flink runner which translates Beam programs into Flink programs.
>
> Best,
> Fabian
>
> 2018-01-18 16:02 GMT+01:00 Pawel Bartoszek <pawelbartoszek89@gmail.com <ma...@gmail.com>>:
> Can I ask why some operations run only one slot? I understand that file writes should happen only one one slot but GroupByKey operation could be distributed across all slots. I am having around 20k distinct keys every minute. Is there any way to break this operator chain?
>
> I noticed that CombinePerKey operations that don't have IO related transformation are scheduled across all 32 slots.
>
>
> My cluster has 32 slots across 2 task managers. Running Beam 2.2. and Flink 1.3.2
>
> 2018-01-18, 13:56:28 2018-01-18, 14:37:14 40m 45s GroupByKey -> ParMultiDo(WriteShardedBundles) -> ParMultiDo(Anonymous) -> xxx.pipeline.output.io.file.WriteWindowToFile-SumPlaybackBitrateResult2/TextIO.Write/WriteFiles/Reshuffle/Window.Into()/Window.Assign.out -> ParMultiDo(ReifyValueTimestamp) -> ToKeyedWorkItem 149 MB 333,672 70.8 MB 19 32
> 00320000
> RUNNING
> Start Time End Time Duration Bytes received Records received Bytes sent Records sent Attempt Host Status
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 77.5 MB 333,683 2.21 MB 20 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
>
> Thanks,
> Pawel
>
Re: Scheduling of GroupByKey and CombinePerKey operations
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Pawel,
This question might be better suited for the Beam user list.
Beam includes the Beam Flink runner which translates Beam programs into
Flink programs.
Best,
Fabian
2018-01-18 16:02 GMT+01:00 Pawel Bartoszek <pa...@gmail.com>:
> Can I ask why some operations run only one slot? I understand that file
> writes should happen only one one slot but GroupByKey operation could be
> distributed across all slots. I am having around 20k distinct keys every
> minute. Is there any way to break this operator chain?
>
> I noticed that CombinePerKey operations that don't have IO related
> transformation are scheduled across all 32 slots.
>
>
> My cluster has 32 slots across 2 task managers. Running Beam 2.2. and
> Flink 1.3.2
>
> 2018-01-18, 13:56:28 2018-01-18, 14:37:14 40m 45s GroupByKey ->
> ParMultiDo(WriteShardedBundles) -> ParMultiDo(Anonymous) ->
> xxx.pipeline.output.io.file.WriteWindowToFile-SumPlaybackBitrateResult2/
> TextIO.Write/WriteFiles/Reshuffle/Window.Into()/Window.Assign.out ->
> ParMultiDo(ReifyValueTimestamp) -> ToKeyedWorkItem 149 MB 333,672 70.8 MB
> 19 32
> 00320000
> RUNNING
>
> Start TimeEnd TimeDurationBytes receivedRecords receivedBytes sentRecords
> sentAttemptHostStatus
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 77.5 MB 333,683 2.21 MB 20 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
> 2018-01-18, 13:56:28 40m 45s 2.30 MB 0 2.21 MB 0 1 xxx RUNNING
>
> Thanks,
> Pawel
>