You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Eugene Kirpichov (JIRA)" <ji...@apache.org> on 2016/12/13 22:05:58 UTC
[jira] [Created] (BEAM-1149) Side input access fails in direct
runner (possibly others too) when input element in multiple windows
Eugene Kirpichov created BEAM-1149:
--------------------------------------
Summary: Side input access fails in direct runner (possibly others too) when input element in multiple windows
Key: BEAM-1149
URL: https://issues.apache.org/jira/browse/BEAM-1149
Project: Beam
Issue Type: Bug
Reporter: Eugene Kirpichov
Priority: Blocker
{code:java}
private static class FnWithSideInputs extends DoFn<String, String> {
private final PCollectionView<Integer> view;
private FnWithSideInputs(PCollectionView<Integer> view) {
this.view = view;
}
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + ":" + c.sideInput(view));
}
}
@Test
public void testSideInputsWithMultipleWindows() {
Pipeline p = TestPipeline.create();
MutableDateTime mutableNow = Instant.now().toMutableDateTime();
mutableNow.setMillisOfSecond(0);
Instant now = mutableNow.toInstant();
SlidingWindows windowFn =
SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
PCollectionView<Integer> view = p.apply(Create.of(1)).apply(View.<Integer>asSingleton());
PCollection<String> res =
p.apply(Create.timestamped(TimestampedValue.of("a", now)))
.apply(Window.<String>into(windowFn))
.apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view));
PAssert.that(res).containsInAnyOrder("a:1");
p.run();
}
{code}
This fails with the following exception:
{code}
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: sideInput called when main input element is in multiple windows
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:343)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:1)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:176)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
at ....
Caused by: java.lang.IllegalStateException: sideInput called when main input element is in multiple windows
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:514)
at org.apache.beam.sdk.transforms.ParDoTest$FnWithSideInputs.processElement(ParDoTest.java:738)
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)