You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Talat Uyarer via user <us...@beam.apache.org> on 2023/05/19 09:25:28 UTC

Local Combiner for GroupByKey on Flink Streaming jobs

Hi,

I have a stream aggregation job which is running on Flink 1.13 I generate
DAG by using Beam SQL. My SQL query has a TUMBLE window. Basically My
pipeline reads from kafka aggregate, counts/sums some values by streamin
aggregation and writes a Sink.

BeamSQl uses Groupbykey for the aggregation part. When I read the
translation code for Group By Key class in Flink Runner [1] I could not see
any local combiner. I see ReducerFunction but I feel it works on the
reducer side. If this is true. How can I implement a local reducer in
Source step to improve shuffling performance or Do I miss something?

If you need more information about my pipeline I share some below.

Thanks
[1]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L905


This is my SQL query : "SELECT log_source_id, SUM(size) AS total_size FROM
PCOLLECTION  GROUP BY log_source_id, TUMBLE(log_time, INTERVAL '1' MINUTE)"
When I submit the job Flink generates two fused steps Source -> Sink Step.
I shared the Task Name below.
First Step Source step:
Source:
Kafka_IO/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)
->
Flat Map ->
ParMultiDo(AvroBytesToRowConverter) ->
BeamCalcRel_47/ParDo(Calc)/ParMultiDo(Calc) ->
BeamAggregationRel_48/assignEventTimestamp/AddTimestamps/ParMultiDo(AddTimestamps)
->
BeamAggregationRel_48/Window.Into()/Window.Assign.out ->
BeamAggregationRel_48/Group.CombineFieldsByFields/ToKvs/selectKeys/AddKeys/Map/ParMultiDo(Anonymous)
->
ToBinaryKeyedWorkItem

Second Step is Aggregation and Sink Step:

BeamAggregationRel_48/Group.CombineFieldsByFields/ToKvs/GroupByKey ->
ToGBKResult ->
BeamAggregationRel_48/Group.CombineFieldsByFields/Combine/ParDo(Anonymous)/ParMultiDo(Anonymous)
->
BeamAggregationRel_48/Group.CombineFieldsByFields/ToRow/ParMultiDo(Anonymous)
->
BeamAggregationRel_48/mergeRecord/ParMultiDo(Anonymous) ->
BeamCalcRel_49/ParDo(Calc)/ParMultiDo(Calc) ->
ParMultiDo(RowToOutputFormat) ->
ParMultiDo(SinkProcessor)

Local Combiner for GroupByKey on Flink Streaming jobs

Posted by Talat Uyarer via dev <de...@beam.apache.org>.
Sorry for cross posting

---------- Forwarded message ---------
From: Talat Uyarer <tu...@paloaltonetworks.com>
Date: Fri, May 19, 2023, 2:25 AM
Subject: Local Combiner for GroupByKey on Flink Streaming jobs
To: <us...@beam.apache.org>


Hi,

I have a stream aggregation job which is running on Flink 1.13 I generate
DAG by using Beam SQL. My SQL query has a TUMBLE window. Basically My
pipeline reads from kafka aggregate, counts/sums some values by streamin
aggregation and writes a Sink.

BeamSQl uses Groupbykey for the aggregation part. When I read the
translation code for Group By Key class in Flink Runner [1] I could not see
any local combiner. I see ReducerFunction but I feel it works on the
reducer side. If this is true. How can I implement a local reducer in
Source step to improve shuffling performance or Do I miss something?

If you need more information about my pipeline I share some below.

Thanks
[1]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L905


This is my SQL query : "SELECT log_source_id, SUM(size) AS total_size FROM
PCOLLECTION  GROUP BY log_source_id, TUMBLE(log_time, INTERVAL '1' MINUTE)"
When I submit the job Flink generates two fused steps Source -> Sink Step.
I shared the Task Name below.
First Step Source step:
Source:
Kafka_IO/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)
->
Flat Map ->
ParMultiDo(AvroBytesToRowConverter) ->
BeamCalcRel_47/ParDo(Calc)/ParMultiDo(Calc) ->
BeamAggregationRel_48/assignEventTimestamp/AddTimestamps/ParMultiDo(AddTimestamps)
->
BeamAggregationRel_48/Window.Into()/Window.Assign.out ->
BeamAggregationRel_48/Group.CombineFieldsByFields/ToKvs/selectKeys/AddKeys/Map/ParMultiDo(Anonymous)
->
ToBinaryKeyedWorkItem

Second Step is Aggregation and Sink Step:

BeamAggregationRel_48/Group.CombineFieldsByFields/ToKvs/GroupByKey ->
ToGBKResult ->
BeamAggregationRel_48/Group.CombineFieldsByFields/Combine/ParDo(Anonymous)/ParMultiDo(Anonymous)
->
BeamAggregationRel_48/Group.CombineFieldsByFields/ToRow/ParMultiDo(Anonymous)
->
BeamAggregationRel_48/mergeRecord/ParMultiDo(Anonymous) ->
BeamCalcRel_49/ParDo(Calc)/ParMultiDo(Calc) ->
ParMultiDo(RowToOutputFormat) ->
ParMultiDo(SinkProcessor)