You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Przemyslaw Pastuszka (JIRA)" <ji...@apache.org> on 2018/10/08 08:35:00 UTC

[jira] [Created] (BEAM-5673) Direct java runner crashes when using both timers and side input

Przemyslaw Pastuszka created BEAM-5673:
------------------------------------------

             Summary: Direct java runner crashes when using both timers and side input
                 Key: BEAM-5673
                 URL: https://issues.apache.org/jira/browse/BEAM-5673
             Project: Beam
          Issue Type: Bug
          Components: beam-model
    Affects Versions: 2.6.0
            Reporter: Przemyslaw Pastuszka
            Assignee: Kenneth Knowles


I'm trying to write a ParDo, which will use both Timer and Side Input, but it crashes when I try to run it with {{beam-runners-direct-java}} with {{IllegalArgumentException}} on a line [https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java#L167], because there are actually two inputs to ParDo (main PCollection and side input), while only one is expected. It looks like a bug in an implementation.
 
Here's the code that reproduces the issue:
{code:java}
public class TestCrashesForTimerAndSideInput {
    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    private static class DoFnWithTimer extends DoFn<KV<String, String>, String> {
        @TimerId("t")
        private final TimerSpec tSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
        private final PCollectionView<Map<String, String>> sideInput;

        private DoFnWithTimer(PCollectionView<Map<String, String>> sideInput) {
            this.sideInput = sideInput;
        }

        @ProcessElement
        public void processElement(ProcessContext c, @TimerId("t") Timer t) {
            KV<String, String> element = c.element();
            c.output(element.getKey() + c.sideInput(sideInput).get(element));
            t.offset(Duration.standardSeconds(1)).setRelative();
        }

        @OnTimer("t")
        public void onTimerFire(OnTimerContext x) {
            x.output("Timer fired");
        }
    }

    @Test
    public void testCrashesForTimerAndSideInput() {
        ImmutableMap<String, String> sideData = ImmutableMap.<String, String>builder().
                put("x", "X").
                put("y", "Y").
                build();

        PCollectionView<Map<String, String>> sideInput =
                p.apply(Create.of(sideData)).apply(View.asMap());

        TestStream<String> testStream = TestStream.create(StringUtf8Coder.of()).
                addElements("x").
                advanceProcessingTime(Duration.standardSeconds(1)).
                addElements("y").
                advanceProcessingTime(Duration.standardSeconds(1)).
                advanceWatermarkToInfinity();

        PCollection<String> result = p.
                apply(testStream).
                apply(MapElements.into(kvs(strings(), strings())).via(v -> KV.of(v, v))).
                apply(ParDo.of(new DoFnWithTimer(sideInput)).withSideInputs(sideInput));

        PAssert.that(result).containsInAnyOrder("xX", "yY", "Timer fired");
        p.run();
    }

}
{code}
 
and the error is:
{code}
java.lang.IllegalArgumentException: expected one element but was: <ParDo(DoFnWithTimer)/ParMultiDo(DoFnWithTimer)/To KeyedWorkItem/ParMultiDo(ToKeyedWorkItem).output [PCollection], View.AsMap/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization).output [PCollection]>

	at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:322)
	at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294)
	at org.apache.beam.runners.direct.QuiescenceDriver.fireTimers(QuiescenceDriver.java:167)
	at org.apache.beam.runners.direct.QuiescenceDriver.drive(QuiescenceDriver.java:110)
	at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$2.run(ExecutorServiceParallelExecutor.java:170)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)