You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/04/14 17:20:00 UTC

[jira] [Commented] (BEAM-11971) Direct Runner State is null while active timers exist

    [ https://issues.apache.org/jira/browse/BEAM-11971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17321167#comment-17321167 ] 

Beam JIRA Bot commented on BEAM-11971:
--------------------------------------

This issue is assigned but has not received an update in 30 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned.

> Direct Runner State is null while active timers exist
> -----------------------------------------------------
>
>                 Key: BEAM-11971
>                 URL: https://issues.apache.org/jira/browse/BEAM-11971
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Reza ardeshir rokni
>            Assignee: Kenneth Knowles
>            Priority: P2
>              Labels: stale-assigned
>
> State is set to Null while active timer is present, this issue does not show in other runners.
>  
> The following example will reach the IllegalStateException within 10-20 times of it being run . LOOP_COUNT does not seem to be a factor as it reproduces with 100 or 100000 LOOP_COUNT. The number of keys is a factor as it did not reproduce with only one key, have not tried with more than 3 keys to see if its easier to reproduce. 
> public void testToFailure() throws Exception {
> int count = 0;
> while(true){
> failingTest();
> System.out.println(String.format("Got to Count %s", String.valueOf(count++)));
> }
> }
> public void failingTest() throws Exception {
> Instant now = Instant.now();
> TestStream<Integer> stream =
> TestStream.create(BigEndianIntegerCoder.of())
> .addElements(1)
> .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
> .addElements(2)
> .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
> .addElements(3)
> .advanceWatermarkToInfinity();
> p.apply(stream)
> .apply(WithKeys.of(x -> x))
> .setCoder(KvCoder.of(BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()))
> .apply(new TestToFail());
> p.run();
> }
> public static class TestToFail
> extends PTransform<PCollection<KV<Integer, Integer>>, PCollection<Integer>> {
> @Override
> public PCollection<Integer> expand(PCollection<KV<Integer, Integer>> input) {
> return input.apply(ParDo.of(new LoopingRead()));
> }
> }
> public static class LoopingRead extends DoFn<KV<Integer, Integer>, Integer> {
> static int LOOP_COUNT = 100;
> @StateId("value")
> private final StateSpec<ValueState<Integer>> value =
> StateSpecs.value(BigEndianIntegerCoder.of());
> @StateId("count")
> private final StateSpec<ValueState<Integer>> count =
> StateSpecs.value(BigEndianIntegerCoder.of());
> @TimerId("actionTimers")
> private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
> @ProcessElement
> public void processElement(
> ProcessContext c,
> @StateId("value") ValueState<Integer> value,
> @TimerId("actionTimers") Timer timers) {
> value.write(c.element().getValue());
> timers.set(c.timestamp().plus(Duration.millis(1000)));
> }
> /** */
> @OnTimer("actionTimers")
> public void onTimer(
> OnTimerContext c,
> @StateId("value") ValueState<Integer> value,
> @StateId("count") ValueState<Integer> count,
> @TimerId("actionTimers") Timer timers) {
> if (value.read() == null) {
> throw new IllegalStateException("BINGO!");
> }
> Integer counter = Optional.ofNullable(count.read()).orElse(0) + 1;
> count.write(counter);
> value.write(value.read() + counter);
> if (counter < LOOP_COUNT) {
> timers.set(c.timestamp().plus(Duration.standardSeconds(1)));
> }
> }
> }
> }
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)