You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/08/10 17:08:29 UTC

[jira] [Updated] (BEAM-5330) Support zero-shuffle grouping operations

     [ https://issues.apache.org/jira/browse/BEAM-5330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot updated BEAM-5330:
--------------------------------
    Labels: performance stale-P2  (was: performance)

> Support zero-shuffle grouping operations
> ----------------------------------------
>
>                 Key: BEAM-5330
>                 URL: https://issues.apache.org/jira/browse/BEAM-5330
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-euphoria
>            Reporter: Jan Lukavský
>            Priority: P2
>              Labels: performance, stale-P2
>
> On some occasions input dataset might be already correctly shuffled (i.e. as a result of previous operation(s)), which means that subsequent grouping operation could leverage this and remove the unneeded shuffle. Example (pseudocode):
> {code:java}
>  Dataset<Integer> input = ...
>  Dataset<Pair<Integer, Long>> counts1 = CountByKey.of(input)
>    .keyBy(e -> e)
>    .windowBy( /* some small window */ )
>    .output();
>  Dataset<Pair<Integer, Long>> counts2 = SumByKey.of(counts1)
>    .keyBy(Pair::getFirst)
>    .windowBy( /* larger window */ )
>    .output();
> {code}
> Now, the second {{ReduceByKey}} already might have correct shuffle (depends on runner), but isn't able to leverage this, because it isn't aware that the key grouping key has not changed from the previous operation.
> Proposed change:
> {code:java}
>  Dataset<Integer> input = ...
>  Dataset<Pair<Integer, Long>> counts1 = CountByKey.of(input)
>    .keyBy(e -> e)
>    .windowBy( /* some small window */ )
>    .output();
>  Dataset<Pair<Integer, Long>> counts2 = SumByKey.of(counts1)
>    .keyByLocally(Pair::getFirst)
>    .windowBy( /* larger window */ )
>    .output();
> {code}
> Introduce {{keyByLocally}} to keyed operations, which will tell the runner that the grouping is preserved from one keyed operator to the other.
> This will probably require some support on Beam SDK side, because this information has to be passed to the runner (so that i.e. FlinkRunner can make use of something like {{DataStreamUtils#reinterpretAsKeyedStream}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)