You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2022/03/24 18:28:00 UTC
[jira] [Updated] (BEAM-14127) Timers with same family ids in same stage (but different transforms) are buffered together
[ https://issues.apache.org/jira/browse/BEAM-14127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-14127:
-----------------------------------
Status: Open (was: Triage Needed)
> Timers with same family ids in same stage (but different transforms) are buffered together
> ------------------------------------------------------------------------------------------
>
> Key: BEAM-14127
> URL: https://issues.apache.org/jira/browse/BEAM-14127
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Pablo Estrada
> Priority: P2
>
> The following test case does not work properly:
>
> {code:java}
> def test_dynamic_timer_clear_then_set_timer(self):
> class EmitTwoEvents(DoFn):
> EMIT_CLEAR_SET_TIMER = TimerSpec('emitclear', TimeDomain.WATERMARK)
> def process(self, element, emit=DoFn.TimerParam(EMIT_CLEAR_SET_TIMER)):
> yield ('1', 'set')
> emit.set(1)
> @on_timer(EMIT_CLEAR_SET_TIMER)
> def emit_clear(self):
> yield ('1', 'clear')
> class DynamicTimerDoFn(DoFn):
> EMIT_TIMER_FAMILY = TimerSpec('emit', TimeDomain.WATERMARK)
> def process(self, element, emit=DoFn.TimerParam(EMIT_TIMER_FAMILY)):
> if element[1] == 'set':
> emit.set(10, dynamic_timer_tag='emit1')
> emit.set(20, dynamic_timer_tag='emit2')
> if element[1] == 'clear':
> emit.set(30, dynamic_timer_tag='emit3')
> emit.clear(dynamic_timer_tag='emit3')
> emit.set(40, dynamic_timer_tag='emit3')
> return []
> @on_timer(EMIT_TIMER_FAMILY)
> def emit_callback(
> self, ts=DoFn.TimestampParam, tag=DoFn.DynamicTimerTagParam):
> yield (tag, ts)
> with TestPipeline() as p:
> res = (
> p
> | beam.Create([('1', 'impulse')])
> | beam.ParDo(EmitTwoEvents())
> | beam.ParDo(DynamicTimerDoFn()))
> assert_that(res, equal_to([('emit1', 10), ('emit2', 20), ('emit3', 40)]) {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)