You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2022/03/24 18:28:00 UTC

[jira] [Commented] (BEAM-14127) Timers with same family ids in same stage (but different transforms) are buffered together

    [ https://issues.apache.org/jira/browse/BEAM-14127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512016#comment-17512016 ] 

Kenneth Knowles commented on BEAM-14127:
----------------------------------------

Would this be in the SDK harness or is it in the runner?

> Timers with same family ids in same stage (but different transforms) are buffered together
> ------------------------------------------------------------------------------------------
>
>                 Key: BEAM-14127
>                 URL: https://issues.apache.org/jira/browse/BEAM-14127
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Priority: P2
>
> The following test case does not work properly:
>  
> {code:java}
> def test_dynamic_timer_clear_then_set_timer(self):
>   class EmitTwoEvents(DoFn):
>     EMIT_CLEAR_SET_TIMER = TimerSpec('emitclear', TimeDomain.WATERMARK)
>     def process(self, element, emit=DoFn.TimerParam(EMIT_CLEAR_SET_TIMER)):
>       yield ('1', 'set')
>       emit.set(1)
>     @on_timer(EMIT_CLEAR_SET_TIMER)
>     def emit_clear(self):
>       yield ('1', 'clear')
>   class DynamicTimerDoFn(DoFn):
>     EMIT_TIMER_FAMILY = TimerSpec('emit', TimeDomain.WATERMARK)
>     def process(self, element, emit=DoFn.TimerParam(EMIT_TIMER_FAMILY)):
>       if element[1] == 'set':
>         emit.set(10, dynamic_timer_tag='emit1')
>         emit.set(20, dynamic_timer_tag='emit2')
>       if element[1] == 'clear':
>         emit.set(30, dynamic_timer_tag='emit3')
>         emit.clear(dynamic_timer_tag='emit3')
>         emit.set(40, dynamic_timer_tag='emit3')
>       return []
>     @on_timer(EMIT_TIMER_FAMILY)
>     def emit_callback(
>         self, ts=DoFn.TimestampParam, tag=DoFn.DynamicTimerTagParam):
>       yield (tag, ts)
>   with TestPipeline() as p:
>     res = (
>         p
>         | beam.Create([('1', 'impulse')])
>         | beam.ParDo(EmitTwoEvents())
>         | beam.ParDo(DynamicTimerDoFn()))
>     assert_that(res, equal_to([('emit1', 10), ('emit2', 20), ('emit3', 40)]) {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)