You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Pablo Estrada (Jira)" <ji...@apache.org> on 2022/03/18 17:08:00 UTC
[jira] [Created] (BEAM-14127) Timers with same family ids in same stage (but different transforms) are buffered together
Pablo Estrada created BEAM-14127:
------------------------------------
Summary: Timers with same family ids in same stage (but different transforms) are buffered together
Key: BEAM-14127
URL: https://issues.apache.org/jira/browse/BEAM-14127
Project: Beam
Issue Type: Sub-task
Components: sdk-py-core
Reporter: Pablo Estrada
The following test case does not work properly:
{code:java}
def test_dynamic_timer_clear_then_set_timer(self):
class EmitTwoEvents(DoFn):
EMIT_CLEAR_SET_TIMER = TimerSpec('emitclear', TimeDomain.WATERMARK)
def process(self, element, emit=DoFn.TimerParam(EMIT_CLEAR_SET_TIMER)):
yield ('1', 'set')
emit.set(1)
@on_timer(EMIT_CLEAR_SET_TIMER)
def emit_clear(self):
yield ('1', 'clear')
class DynamicTimerDoFn(DoFn):
EMIT_TIMER_FAMILY = TimerSpec('emit', TimeDomain.WATERMARK)
def process(self, element, emit=DoFn.TimerParam(EMIT_TIMER_FAMILY)):
if element[1] == 'set':
emit.set(10, dynamic_timer_tag='emit1')
emit.set(20, dynamic_timer_tag='emit2')
if element[1] == 'clear':
emit.set(30, dynamic_timer_tag='emit3')
emit.clear(dynamic_timer_tag='emit3')
emit.set(40, dynamic_timer_tag='emit3')
return []
@on_timer(EMIT_TIMER_FAMILY)
def emit_callback(
self, ts=DoFn.TimestampParam, tag=DoFn.DynamicTimerTagParam):
yield (tag, ts)
with TestPipeline() as p:
res = (
p
| beam.Create([('1', 'impulse')])
| beam.ParDo(EmitTwoEvents())
| beam.ParDo(DynamicTimerDoFn()))
assert_that(res, equal_to([('emit1', 10), ('emit2', 20), ('emit3', 40)]) {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)