You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/08/10 17:08:29 UTC
[jira] [Updated] (BEAM-5330) Support zero-shuffle grouping
operations
[ https://issues.apache.org/jira/browse/BEAM-5330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Beam JIRA Bot updated BEAM-5330:
--------------------------------
Labels: performance stale-P2 (was: performance)
> Support zero-shuffle grouping operations
> ----------------------------------------
>
> Key: BEAM-5330
> URL: https://issues.apache.org/jira/browse/BEAM-5330
> Project: Beam
> Issue Type: Improvement
> Components: dsl-euphoria
> Reporter: Jan Lukavský
> Priority: P2
> Labels: performance, stale-P2
>
> On some occasions input dataset might be already correctly shuffled (i.e. as a result of previous operation(s)), which means that subsequent grouping operation could leverage this and remove the unneeded shuffle. Example (pseudocode):
> {code:java}
> Dataset<Integer> input = ...
> Dataset<Pair<Integer, Long>> counts1 = CountByKey.of(input)
> .keyBy(e -> e)
> .windowBy( /* some small window */ )
> .output();
> Dataset<Pair<Integer, Long>> counts2 = SumByKey.of(counts1)
> .keyBy(Pair::getFirst)
> .windowBy( /* larger window */ )
> .output();
> {code}
> Now, the second {{ReduceByKey}} already might have correct shuffle (depends on runner), but isn't able to leverage this, because it isn't aware that the key grouping key has not changed from the previous operation.
> Proposed change:
> {code:java}
> Dataset<Integer> input = ...
> Dataset<Pair<Integer, Long>> counts1 = CountByKey.of(input)
> .keyBy(e -> e)
> .windowBy( /* some small window */ )
> .output();
> Dataset<Pair<Integer, Long>> counts2 = SumByKey.of(counts1)
> .keyByLocally(Pair::getFirst)
> .windowBy( /* larger window */ )
> .output();
> {code}
> Introduce {{keyByLocally}} to keyed operations, which will tell the runner that the grouping is preserved from one keyed operator to the other.
> This will probably require some support on Beam SDK side, because this information has to be passed to the runner (so that i.e. FlinkRunner can make use of something like {{DataStreamUtils#reinterpretAsKeyedStream}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)