You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Pablo Estrada (Jira)" <ji...@apache.org> on 2022/03/18 17:08:00 UTC

[jira] [Created] (BEAM-14127) Timers with same family ids in same stage (but different transforms) are buffered together

Pablo Estrada created BEAM-14127:
------------------------------------

             Summary: Timers with same family ids in same stage (but different transforms) are buffered together
                 Key: BEAM-14127
                 URL: https://issues.apache.org/jira/browse/BEAM-14127
             Project: Beam
          Issue Type: Sub-task
          Components: sdk-py-core
            Reporter: Pablo Estrada


The following test case does not work properly:

 
{code:java}
def test_dynamic_timer_clear_then_set_timer(self):

  class EmitTwoEvents(DoFn):
    EMIT_CLEAR_SET_TIMER = TimerSpec('emitclear', TimeDomain.WATERMARK)

    def process(self, element, emit=DoFn.TimerParam(EMIT_CLEAR_SET_TIMER)):
      yield ('1', 'set')
      emit.set(1)

    @on_timer(EMIT_CLEAR_SET_TIMER)
    def emit_clear(self):
      yield ('1', 'clear')

  class DynamicTimerDoFn(DoFn):
    EMIT_TIMER_FAMILY = TimerSpec('emit', TimeDomain.WATERMARK)

    def process(self, element, emit=DoFn.TimerParam(EMIT_TIMER_FAMILY)):
      if element[1] == 'set':
        emit.set(10, dynamic_timer_tag='emit1')
        emit.set(20, dynamic_timer_tag='emit2')
      if element[1] == 'clear':
        emit.set(30, dynamic_timer_tag='emit3')
        emit.clear(dynamic_timer_tag='emit3')
        emit.set(40, dynamic_timer_tag='emit3')
      return []

    @on_timer(EMIT_TIMER_FAMILY)
    def emit_callback(
        self, ts=DoFn.TimestampParam, tag=DoFn.DynamicTimerTagParam):
      yield (tag, ts)

  with TestPipeline() as p:
    res = (
        p
        | beam.Create([('1', 'impulse')])
        | beam.ParDo(EmitTwoEvents())
        | beam.ParDo(DynamicTimerDoFn()))
    assert_that(res, equal_to([('emit1', 10), ('emit2', 20), ('emit3', 40)]) {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)