You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Maximilian Michels (Jira)" <ji...@apache.org> on 2020/05/04 09:05:00 UTC

[jira] [Assigned] (BEAM-9801) Setting a timer from a timer callback fails

     [ https://issues.apache.org/jira/browse/BEAM-9801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Maximilian Michels reassigned BEAM-9801:
----------------------------------------

    Assignee: Maximilian Michels

> Setting a timer from a timer callback fails
> -------------------------------------------
>
>                 Key: BEAM-9801
>                 URL: https://issues.apache.org/jira/browse/BEAM-9801
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-harness
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Critical
>             Fix For: 2.21.0
>
>          Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> Hi,
> I'm trying to set a timer from a timer callback in the Python SDK:
> {code:Python}
> class GenerateLoad(beam.DoFn):
>   timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
>   def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
>     self.key = element[0]
>     timer.set(0)
>   @userstate.on_timer(timer_spec)
>   def process_timer(self, timer=beam.DoFn.TimerParam(timer_spec)):
>     timer.set(0)
> {code}
> This yields the following Python stack trace:
> {noformat}
> INFO:apache_beam.utils.subprocess_server:Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 4: Traceback (most recent call last):
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 245, in _execute
> INFO:apache_beam.utils.subprocess_server: response = task()
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda>
> INFO:apache_beam.utils.subprocess_server: lambda: self.create_worker().do_instruction(request), request)
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction
> INFO:apache_beam.utils.subprocess_server: getattr(request, request_type), request.instruction_id)
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle
> INFO:apache_beam.utils.subprocess_server: bundle_processor.process_bundle(instruction_id))
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/bundle_processor.py", line 910, in process_bundle
> INFO:apache_beam.utils.subprocess_server: element.timer_family_id, timer_data)
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/operations.py", line 688, in process_timer
> INFO:apache_beam.utils.subprocess_server: timer_data.fire_timestamp)
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 990, in process_user_timer
> INFO:apache_beam.utils.subprocess_server: self._reraise_augmented(exn)
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 1043, in _reraise_augmented
> INFO:apache_beam.utils.subprocess_server: raise_with_traceback(new_exn)
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 988, in process_user_timer
> INFO:apache_beam.utils.subprocess_server: self.do_fn_invoker.invoke_user_timer(timer_spec, key, window, timestamp)
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 517, in invoke_user_timer
> INFO:apache_beam.utils.subprocess_server: self.user_state_context, key, window, timestamp))
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 1093, in process_outputs
> INFO:apache_beam.utils.subprocess_server: for result in results:
> INFO:apache_beam.utils.subprocess_server: File "/Users/max/Dev/beam/sdks/python/apache_beam/testing/load_tests/pardo_test.py", line 185, in process_timer
> INFO:apache_beam.utils.subprocess_server: timer.set(0)
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/bundle_processor.py", line 589, in set
> INFO:apache_beam.utils.subprocess_server: self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/coders/coder_impl.py", line 651, in encode_to_stream
> INFO:apache_beam.utils.subprocess_server: value.hold_timestamp, out, True)
> INFO:apache_beam.utils.subprocess_server: File "apache_beam/coders/coder_impl.py", line 608, in encode_to_stream
> INFO:apache_beam.utils.subprocess_server: millis = value.micros // 1000
> INFO:apache_beam.utils.subprocess_server:AttributeError: 'NoneType' object has no attribute 'micros' [while running 'GenerateLoad']
> {noformat}
> Looking at the code base, I'm not sure we have tests for timer output timestamps. Am I missing something?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)