You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Eugene Kirpichov (JIRA)" <ji...@apache.org> on 2016/12/13 22:05:58 UTC

[jira] [Created] (BEAM-1149) Side input access fails in direct runner (possibly others too) when input element in multiple windows

Eugene Kirpichov created BEAM-1149:
--------------------------------------

             Summary: Side input access fails in direct runner (possibly others too) when input element in multiple windows
                 Key: BEAM-1149
                 URL: https://issues.apache.org/jira/browse/BEAM-1149
             Project: Beam
          Issue Type: Bug
            Reporter: Eugene Kirpichov
            Priority: Blocker


{code:java}
  private static class FnWithSideInputs extends DoFn<String, String> {
    private final PCollectionView<Integer> view;

    private FnWithSideInputs(PCollectionView<Integer> view) {
      this.view = view;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
      c.output(c.element() + ":" + c.sideInput(view));
    }
  }

  @Test
  public void testSideInputsWithMultipleWindows() {
    Pipeline p = TestPipeline.create();

    MutableDateTime mutableNow = Instant.now().toMutableDateTime();
    mutableNow.setMillisOfSecond(0);
    Instant now = mutableNow.toInstant();

    SlidingWindows windowFn =
        SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
    PCollectionView<Integer> view = p.apply(Create.of(1)).apply(View.<Integer>asSingleton());
    PCollection<String> res =
        p.apply(Create.timestamped(TimestampedValue.of("a", now)))
            .apply(Window.<String>into(windowFn))
            .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view));

    PAssert.that(res).containsInAnyOrder("a:1");

    p.run();
  }
{code}

This fails with the following exception:
{code}

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: sideInput called when main input element is in multiple windows

	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:343)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:1)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:176)
	at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
	at ....
Caused by: java.lang.IllegalStateException: sideInput called when main input element is in multiple windows
	at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:514)
	at org.apache.beam.sdk.transforms.ParDoTest$FnWithSideInputs.processElement(ParDoTest.java:738)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)