You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/04/14 17:20:00 UTC

[jira] [Updated] (BEAM-11971) Direct Runner State is null while active timers exist

     [ https://issues.apache.org/jira/browse/BEAM-11971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot updated BEAM-11971:
---------------------------------
    Labels: stale-assigned  (was: )

> Direct Runner State is null while active timers exist
> -----------------------------------------------------
>
>                 Key: BEAM-11971
>                 URL: https://issues.apache.org/jira/browse/BEAM-11971
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Reza ardeshir rokni
>            Assignee: Kenneth Knowles
>            Priority: P2
>              Labels: stale-assigned
>
> State is set to Null while active timer is present, this issue does not show in other runners.
>  
> The following example will reach the IllegalStateException within 10-20 times of it being run . LOOP_COUNT does not seem to be a factor as it reproduces with 100 or 100000 LOOP_COUNT. The number of keys is a factor as it did not reproduce with only one key, have not tried with more than 3 keys to see if its easier to reproduce. 
> public void testToFailure() throws Exception {
> int count = 0;
> while(true){
> failingTest();
> System.out.println(String.format("Got to Count %s", String.valueOf(count++)));
> }
> }
> public void failingTest() throws Exception {
> Instant now = Instant.now();
> TestStream<Integer> stream =
> TestStream.create(BigEndianIntegerCoder.of())
> .addElements(1)
> .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
> .addElements(2)
> .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
> .addElements(3)
> .advanceWatermarkToInfinity();
> p.apply(stream)
> .apply(WithKeys.of(x -> x))
> .setCoder(KvCoder.of(BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()))
> .apply(new TestToFail());
> p.run();
> }
> public static class TestToFail
> extends PTransform<PCollection<KV<Integer, Integer>>, PCollection<Integer>> {
> @Override
> public PCollection<Integer> expand(PCollection<KV<Integer, Integer>> input) {
> return input.apply(ParDo.of(new LoopingRead()));
> }
> }
> public static class LoopingRead extends DoFn<KV<Integer, Integer>, Integer> {
> static int LOOP_COUNT = 100;
> @StateId("value")
> private final StateSpec<ValueState<Integer>> value =
> StateSpecs.value(BigEndianIntegerCoder.of());
> @StateId("count")
> private final StateSpec<ValueState<Integer>> count =
> StateSpecs.value(BigEndianIntegerCoder.of());
> @TimerId("actionTimers")
> private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
> @ProcessElement
> public void processElement(
> ProcessContext c,
> @StateId("value") ValueState<Integer> value,
> @TimerId("actionTimers") Timer timers) {
> value.write(c.element().getValue());
> timers.set(c.timestamp().plus(Duration.millis(1000)));
> }
> /** */
> @OnTimer("actionTimers")
> public void onTimer(
> OnTimerContext c,
> @StateId("value") ValueState<Integer> value,
> @StateId("count") ValueState<Integer> count,
> @TimerId("actionTimers") Timer timers) {
> if (value.read() == null) {
> throw new IllegalStateException("BINGO!");
> }
> Integer counter = Optional.ofNullable(count.read()).orElse(0) + 1;
> count.write(counter);
> value.write(value.read() + counter);
> if (counter < LOOP_COUNT) {
> timers.set(c.timestamp().plus(Duration.standardSeconds(1)));
> }
> }
> }
> }
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)