You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Przemyslaw Pastuszka (JIRA)" <ji...@apache.org> on 2018/10/08 08:35:00 UTC
[jira] [Created] (BEAM-5673) Direct java runner crashes when using
both timers and side input
Przemyslaw Pastuszka created BEAM-5673:
------------------------------------------
Summary: Direct java runner crashes when using both timers and side input
Key: BEAM-5673
URL: https://issues.apache.org/jira/browse/BEAM-5673
Project: Beam
Issue Type: Bug
Components: beam-model
Affects Versions: 2.6.0
Reporter: Przemyslaw Pastuszka
Assignee: Kenneth Knowles
I'm trying to write a ParDo, which will use both Timer and Side Input, but it crashes when I try to run it with {{beam-runners-direct-java}} with {{IllegalArgumentException}} on a line [https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java#L167], because there are actually two inputs to ParDo (main PCollection and side input), while only one is expected. It looks like a bug in an implementation.
Here's the code that reproduces the issue:
{code:java}
public class TestCrashesForTimerAndSideInput {
@Rule
public final transient TestPipeline p = TestPipeline.create();
private static class DoFnWithTimer extends DoFn<KV<String, String>, String> {
@TimerId("t")
private final TimerSpec tSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
private final PCollectionView<Map<String, String>> sideInput;
private DoFnWithTimer(PCollectionView<Map<String, String>> sideInput) {
this.sideInput = sideInput;
}
@ProcessElement
public void processElement(ProcessContext c, @TimerId("t") Timer t) {
KV<String, String> element = c.element();
c.output(element.getKey() + c.sideInput(sideInput).get(element));
t.offset(Duration.standardSeconds(1)).setRelative();
}
@OnTimer("t")
public void onTimerFire(OnTimerContext x) {
x.output("Timer fired");
}
}
@Test
public void testCrashesForTimerAndSideInput() {
ImmutableMap<String, String> sideData = ImmutableMap.<String, String>builder().
put("x", "X").
put("y", "Y").
build();
PCollectionView<Map<String, String>> sideInput =
p.apply(Create.of(sideData)).apply(View.asMap());
TestStream<String> testStream = TestStream.create(StringUtf8Coder.of()).
addElements("x").
advanceProcessingTime(Duration.standardSeconds(1)).
addElements("y").
advanceProcessingTime(Duration.standardSeconds(1)).
advanceWatermarkToInfinity();
PCollection<String> result = p.
apply(testStream).
apply(MapElements.into(kvs(strings(), strings())).via(v -> KV.of(v, v))).
apply(ParDo.of(new DoFnWithTimer(sideInput)).withSideInputs(sideInput));
PAssert.that(result).containsInAnyOrder("xX", "yY", "Timer fired");
p.run();
}
}
{code}
and the error is:
{code}
java.lang.IllegalArgumentException: expected one element but was: <ParDo(DoFnWithTimer)/ParMultiDo(DoFnWithTimer)/To KeyedWorkItem/ParMultiDo(ToKeyedWorkItem).output [PCollection], View.AsMap/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization).output [PCollection]>
at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:322)
at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294)
at org.apache.beam.runners.direct.QuiescenceDriver.fireTimers(QuiescenceDriver.java:167)
at org.apache.beam.runners.direct.QuiescenceDriver.drive(QuiescenceDriver.java:110)
at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$2.run(ExecutorServiceParallelExecutor.java:170)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)