You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2022/05/17 17:31:00 UTC

[jira] [Assigned] (BEAM-11971) Direct Runner State is null while active timers exist

     [ https://issues.apache.org/jira/browse/BEAM-11971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot reassigned BEAM-11971:
------------------------------------

    Assignee:     (was: Reuven Lax)

> Direct Runner State is null while active timers exist
> -----------------------------------------------------
>
>                 Key: BEAM-11971
>                 URL: https://issues.apache.org/jira/browse/BEAM-11971
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Reza ardeshir rokni
>            Priority: P3
>              Labels: stale-assigned
>          Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> State is set to {{null}} while active timer is present, this issue does not show in other runners.
> The following example will reach the IllegalStateException within 10-20 times of it being run. {{LOOP_COUNT}} does not seem to be a factor as it reproduces with 100 or 100000 {{LOOP_COUNT}}. The number of keys is a factor as it did not reproduce with only one key, have not tried with more than 3 keys to see if it's easier to reproduce. 
>  
> {code}
> package test;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.state.StateSpec;
> import org.apache.beam.sdk.state.StateSpecs;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.state.ValueState;
> import org.apache.beam.sdk.testing.TestStream;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.PTransform;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.WithKeys;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.joda.time.Instant;
> import java.util.Optional;
>  
> public class Test {
>    public static void main (String [] args) throws Exception{
>        Test.testToFailure();
>    }
>    public static void testToFailure() throws Exception {
>        int count = 0;
>        while (true) {
>            failingTest();
>            System.out.println(
>                    String.format("Got to Count %s", String.valueOf(count++)));
>        }
>    }
>    public static void failingTest() throws Exception {
>        Pipeline p = Pipeline.create();
>        Instant now = Instant.now();
>        TestStream<Integer> stream =
>                TestStream.create(BigEndianIntegerCoder.of())
>                        .addElements(1)
>                        .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
>                        .addElements(2)
>                        .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
>                        .addElements(3)
>                        .advanceWatermarkToInfinity();
>        p.apply(stream)
>                .apply(WithKeys.of(x -> x))
>                .setCoder(KvCoder.of(BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()))
>                .apply(new TestToFail());
>        p.run();
>    }
>    public static class TestToFail
>            extends PTransform<PCollection<KV<Integer, Integer>>, PCollection<Integer>> {
>        @Override
>        public PCollection<Integer> expand(PCollection<KV<Integer, Integer>> input) {
>            return input.apply(ParDo.of(new LoopingRead()));
>        }
>    }
>    public static class LoopingRead extends DoFn<KV<Integer, Integer>, Integer> {
>        static int LOOP_COUNT = 100;
>        @StateId("value")
>        private final StateSpec<ValueState<Integer>> value =
>                StateSpecs.value(BigEndianIntegerCoder.of());
>        @StateId("count")
>        private final StateSpec<ValueState<Integer>> count =
>                StateSpecs.value(BigEndianIntegerCoder.of());
>        @TimerId("actionTimers")
>        private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>        @ProcessElement
>        public void processElement(
>                ProcessContext c,
>                @StateId("value") ValueState<Integer> value,
>                @TimerId("actionTimers") Timer timers) {
>            value.write(c.element().getValue());
>            timers.set(c.timestamp().plus(Duration.millis(1000)));
>        }
>        /** */
>        @OnTimer("actionTimers")
>        public void onTimer(
>                OnTimerContext c,
>                @StateId("value") ValueState<Integer> value,
>                @StateId("count") ValueState<Integer> count,
>                @TimerId("actionTimers") Timer timers) {
>            if (value.read() == null) {
>                throw new IllegalStateException("BINGO!");
>            }
>            Integer counter = Optional.ofNullable(count.read()).orElse(0) + 1;
>            count.write(counter);
>            value.write(value.read() + counter);
>            if (counter < LOOP_COUNT) {
>                timers.set(c.timestamp().plus(Duration.standardSeconds(1)));
>            }
>        }
>    }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)