You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Kenneth Knowles (JIRA)" <ji...@apache.org> on 2016/12/13 22:09:58 UTC
[jira] [Updated] (BEAM-1149) Side input access fails in direct
runner (possibly others too) when input element in multiple windows
[ https://issues.apache.org/jira/browse/BEAM-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-1149:
----------------------------------
Assignee: Eugene Kirpichov
> Side input access fails in direct runner (possibly others too) when input element in multiple windows
> -----------------------------------------------------------------------------------------------------
>
> Key: BEAM-1149
> URL: https://issues.apache.org/jira/browse/BEAM-1149
> Project: Beam
> Issue Type: Bug
> Reporter: Eugene Kirpichov
> Assignee: Eugene Kirpichov
> Priority: Blocker
>
> {code:java}
> private static class FnWithSideInputs extends DoFn<String, String> {
> private final PCollectionView<Integer> view;
> private FnWithSideInputs(PCollectionView<Integer> view) {
> this.view = view;
> }
> @ProcessElement
> public void processElement(ProcessContext c) {
> c.output(c.element() + ":" + c.sideInput(view));
> }
> }
> @Test
> public void testSideInputsWithMultipleWindows() {
> Pipeline p = TestPipeline.create();
> MutableDateTime mutableNow = Instant.now().toMutableDateTime();
> mutableNow.setMillisOfSecond(0);
> Instant now = mutableNow.toInstant();
> SlidingWindows windowFn =
> SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
> PCollectionView<Integer> view = p.apply(Create.of(1)).apply(View.<Integer>asSingleton());
> PCollection<String> res =
> p.apply(Create.timestamped(TimestampedValue.of("a", now)))
> .apply(Window.<String>into(windowFn))
> .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view));
> PAssert.that(res).containsInAnyOrder("a:1");
> p.run();
> }
> {code}
> This fails with the following exception:
> {code}
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: sideInput called when main input element is in multiple windows
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:343)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:1)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:176)
> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
> at ....
> Caused by: java.lang.IllegalStateException: sideInput called when main input element is in multiple windows
> at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:514)
> at org.apache.beam.sdk.transforms.ParDoTest$FnWithSideInputs.processElement(ParDoTest.java:738)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)