You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/08/10 17:08:29 UTC

[jira] [Commented] (BEAM-5330) Support zero-shuffle grouping operations

    [ https://issues.apache.org/jira/browse/BEAM-5330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174836#comment-17174836 ] 

Beam JIRA Bot commented on BEAM-5330:
-------------------------------------

This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.

Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.


> Support zero-shuffle grouping operations
> ----------------------------------------
>
>                 Key: BEAM-5330
>                 URL: https://issues.apache.org/jira/browse/BEAM-5330
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-euphoria
>            Reporter: Jan Lukavský
>            Priority: P2
>              Labels: performance, stale-P2
>
> On some occasions input dataset might be already correctly shuffled (i.e. as a result of previous operation(s)), which means that subsequent grouping operation could leverage this and remove the unneeded shuffle. Example (pseudocode):
> {code:java}
>  Dataset<Integer> input = ...
>  Dataset<Pair<Integer, Long>> counts1 = CountByKey.of(input)
>    .keyBy(e -> e)
>    .windowBy( /* some small window */ )
>    .output();
>  Dataset<Pair<Integer, Long>> counts2 = SumByKey.of(counts1)
>    .keyBy(Pair::getFirst)
>    .windowBy( /* larger window */ )
>    .output();
> {code}
> Now, the second {{ReduceByKey}} already might have correct shuffle (depends on runner), but isn't able to leverage this, because it isn't aware that the key grouping key has not changed from the previous operation.
> Proposed change:
> {code:java}
>  Dataset<Integer> input = ...
>  Dataset<Pair<Integer, Long>> counts1 = CountByKey.of(input)
>    .keyBy(e -> e)
>    .windowBy( /* some small window */ )
>    .output();
>  Dataset<Pair<Integer, Long>> counts2 = SumByKey.of(counts1)
>    .keyByLocally(Pair::getFirst)
>    .windowBy( /* larger window */ )
>    .output();
> {code}
> Introduce {{keyByLocally}} to keyed operations, which will tell the runner that the grouping is preserved from one keyed operator to the other.
> This will probably require some support on Beam SDK side, because this information has to be passed to the runner (so that i.e. FlinkRunner can make use of something like {{DataStreamUtils#reinterpretAsKeyedStream}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)