You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Ben Chambers (JIRA)" <ji...@apache.org> on 2018/03/08 01:12:00 UTC

[jira] [Created] (BEAM-3806) DirectRunner hangs if multiple timers set in the same bundle

Ben Chambers created BEAM-3806:
----------------------------------

             Summary: DirectRunner hangs if multiple timers set in the same bundle
                 Key: BEAM-3806
                 URL: https://issues.apache.org/jira/browse/BEAM-3806
             Project: Beam
          Issue Type: Bug
          Components: runner-direct
            Reporter: Ben Chambers
            Assignee: Thomas Groh


See the repro below:
{code:java}
package com.simbly.beam.cassandra;

import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;

public class DirectRunnerTest {

  @Rule
  public TestPipeline pipeline = TestPipeline.create();

  @Test
  public void badTimerBehavior() {
    TestStream<KV<String, String>> stream = TestStream
        .create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
        .addElements(KV.of("key1", "v1"))
        .advanceWatermarkToInfinity();

    PCollection<String> result = pipeline
        .apply(stream)
        .apply(ParDo.of(new TestDoFn()));
    PAssert.that(result).containsInAnyOrder("It works");

    pipeline.run().waitUntilFinish();
  }

  private static class TestDoFn extends DoFn<KV<String, String>, String> {
    @TimerId("timer")
    private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement
    public void process(ProcessContext c,
        @TimerId("timer") Timer timer) {
      timer.offset(Duration.standardMinutes(10)).setRelative();
      timer.offset(Duration.standardMinutes(30)).setRelative();
    }

    @OnTimer("timer")
    public void onTimer(OnTimerContext c, @TimerId("timer") Timer timer) {
      c.output("It works");
    }
  }
}
{code}
From inspection, this seems to be caused by the logic in [WatermarkManager|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java#L313], which does the following if there are multiple timers for akey:
 # Adds the first timer to the `pendingTimers`, `keyTimers`, and `existingTimersForKey`.
 # Removes the first timer from `keyTimers`
 # Adds the second timer to `keyTimers` and `existingTimersForKey`.

This leads to inconsistencies since pendingTimers has only the first timer, keyTimers only the second, and existingTimers has both. This becomes more problematic since one of these lists is used for *firing* (and thus releasing holds) and the other is used for holds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)