You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Kenneth Knowles (JIRA)" <ji...@apache.org> on 2018/10/11 17:58:00 UTC

[jira] [Commented] (BEAM-5098) Combine.Globally::asSingletonView clears side inputs

    [ https://issues.apache.org/jira/browse/BEAM-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646829#comment-16646829 ] 

Kenneth Knowles commented on BEAM-5098:
---------------------------------------

[~mpedersen] this one might be a quick fix. Would you be interested in contributing, perhaps?

> Combine.Globally::asSingletonView clears side inputs
> ----------------------------------------------------
>
>                 Key: BEAM-5098
>                 URL: https://issues.apache.org/jira/browse/BEAM-5098
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.5.0
>            Reporter: Mike Pedersen
>            Assignee: Kenneth Knowles
>            Priority: Critical
>              Labels: beginner, starter
>
> It seems like calling .asSingletonView on Combine.Globally clears all side inputs. Take this code for example:
>  
> {code:java}
> public class Main {
>     public static void main(String[] args) {
>         PipelineOptions options = PipelineOptionsFactory.create();
>         Pipeline p = Pipeline.create(options);
>         PCollection<Integer> a = p.apply(Create.of(1, 2, 3));
>         PCollectionView<Integer> b = p.apply(Create.of(10)).apply(View.asSingleton());
>         a
>                 .apply(Combine.globally(new CombineWithContext.CombineFnWithContext<Integer, Integer, Integer>() {
>                     @Override
>                     public Integer createAccumulator(CombineWithContext.Context c) {
>                         return c.sideInput(b);
>                     }
>                     @Override
>                     public Integer addInput(Integer accumulator, Integer input, CombineWithContext.Context c) {
>                         return accumulator + input;
>                     }
>                     @Override
>                     public Integer mergeAccumulators(Iterable<Integer> accumulators, CombineWithContext.Context c) {
>                         int sum = 0;
>                         for (int i : accumulators) {
>                             sum += i;
>                         }
>                         return sum;
>                     }
>                     @Override
>                     public Integer extractOutput(Integer accumulator, CombineWithContext.Context c) {
>                         return accumulator;
>                     }
>                     @Override
>                     public Integer defaultValue() {
>                         return 0;
>                     }
>                 }).withSideInputs(b).asSingletonView());
>         p.run().waitUntilFinish();
>     }
> }{code}
> This fails with the following exception:
> {code:java}
> Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: calling sideInput() with unknown view
>     at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)
>     at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)
>     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)
>     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>     at Main.main(Main.java:287)
> Caused by: java.lang.IllegalArgumentException: calling sideInput() with unknown view
>     at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:212)
>     at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:69)
>     at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:489)
>     at org.apache.beam.sdk.transforms.Combine$GroupedValues$1$1.sideInput(Combine.java:2137)
>     at Main$1.createAccumulator(Main.java:258)
>     at Main$1.createAccumulator(Main.java:255)
>     at org.apache.beam.sdk.transforms.CombineWithContext$CombineFnWithContext.apply(CombineWithContext.java:120)
>     at org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2129){code}
> But if you change
> {code:java}
> .withSideInputs(b).asSingletonView()){code}
> to
> {code:java}
> .withSideInputs(b)).apply(View.asSingleton()){code}
> then it works just fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)