You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Etienne Chauchot (JIRA)" <ji...@apache.org> on 2019/04/05 10:19:00 UTC

[jira] [Comment Edited] (BEAM-6740) Combine.globally translation is never called

    [ https://issues.apache.org/jira/browse/BEAM-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777945#comment-16777945 ] 

Etienne Chauchot edited comment on BEAM-6740 at 4/5/19 10:18 AM:
-----------------------------------------------------------------

No it was changed in this PR: https://github.com/apache/beam/pull/7005/files spark translation is no more based on class but on urn. Previously in spark the class was added directly in spark translators map, and the translation worked fine. 

The problem seems more general than only spark runner because in runner-construction, there is only Combine.perKey PayloadTranslator see  https://github.com/apache/beam/blob/2fbc8c0c2db3a0798e7cca0d30c3a7eec855b375/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java#L57.  I fear the impact of adding a new CombineTranslation for Combine.globally in runner-contruction side. That being said, you're right, only spark has a direct translation of combine.Globally, all the others use only Combine.perKey.

I took a look at nexmark query 7 perf (that uses Combine.globally) and the perf seems the same in spark before deactivating Combine.globally special translation (with Ismael's PR) and after. IMHO, I would recommend to leave the Combine.Globally dead code in case we need it in the future (we see its need with more in depth performances tests). But What is sure is that for now I will deactivate urn lookup in the new POC spark runner for which, not having a combine.globally translation will harm performances. 
WDYT ? 




was (Author: echauchot):
Ni it was changed in this PR: https://github.com/apache/beam/pull/7005/files spark translation is no more based on class but on urn. Previously in spark the class was added directly in spark translators map, and the translation worked fine. 

The problem seems more general than only spark runner because in runner-construction, there is only Combine.perKey PayloadTranslator see  https://github.com/apache/beam/blob/2fbc8c0c2db3a0798e7cca0d30c3a7eec855b375/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java#L57.  I fear the impact of adding a new CombineTranslation for Combine.globally in runner-contruction side. That being said, you're right, only spark has a direct translation of combine.Globally, all the others use only Combine.perKey.

I took a look at nexmark query 7 perf (that uses Combine.globally) and the perf seems the same in spark before deactivating Combine.globally special translation (with Ismael's PR) and after. IMHO, I would recommend to leave the Combine.Globally dead code in case we need it in the future (we see its need with more in depth performances tests). But What is sure is that for now I will deactivate urn lookup in the new POC spark runner for which, not having a combine.globally translation will harm performances. 
WDYT ? 



> Combine.globally translation is never called
> --------------------------------------------
>
>                 Key: BEAM-6740
>                 URL: https://issues.apache.org/jira/browse/BEAM-6740
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Etienne Chauchot
>            Assignee: Ismaël Mejía
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Beam translates Combine.Globally as a composite transform composed of:
>  * Map that assigns Void keys
>  * Combine.PerKey
> on spark: As Combine.Perkey uses a spark GBK inside it, the runner adds its own translation of Combine.Globally to avoid less performant GBK. This translation should be called in place of entering the composite transform translation.A pipeline like this: 
> {code:java}
> PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
> input.apply(
>  Combine.globally(new IntegerCombineFn()));
> {code}
> {code:java}
>   private static class IntegerCombineFn extends Combine.CombineFn<Integer, Integer, Integer> {
>     @Override
>     public Integer createAccumulator() {
>       return 0;
>     }
>     @Override
>     public Integer addInput(Integer accumulator, Integer input) {
>       return accumulator + input;
>     }
>     @Override
>     public Integer mergeAccumulators(Iterable<Integer> accumulators) {
>       Integer result = 0;
>       for (Integer value : accumulators) {
>         result += value;
>       }
>       return result;
>     }
>     @Override
>     public Integer extractOutput(Integer accumulator) {
>       return accumulator;
>     }
>   }
> {code}
> is translated as the above composite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)