You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/05/15 18:00:02 UTC

[jira] [Updated] (BEAM-10848) Gauge metrics error when setting timers

     [ https://issues.apache.org/jira/browse/BEAM-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-10848:
-----------------------------------
    Resolution: Fixed
        Status: Resolved  (was: Resolved)

Hello! Due to a bug in our Jira configuration, this issue had status:Resolved but resolution:Unresolved.

I am bulk editing these issues to have resolution:Fixed

If a different resolution is appropriate, please change it. To do this, click the "Resolve" button (you can do this even for closed issues) and set the Resolution field to the right value.

> Gauge metrics error when setting timers
> ---------------------------------------
>
>                 Key: BEAM-10848
>                 URL: https://issues.apache.org/jira/browse/BEAM-10848
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-harness
>            Reporter: Maximilian Michels
>            Assignee: maghamravikiran
>            Priority: P2
>             Fix For: 2.26.0
>
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Gauges are affected by setting timers leading to {{None}} values:
> {noformat}
> ERROR:apache_beam.runners.worker.sdk_worker:Error processing instruction 147. Original traceback is
> Traceback (most recent call last):
>   File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
>     response = task()
>   File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 516, in process_bundle
>     monitoring_infos = bundle_processor.monitoring_infos()
>   File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1107, in monitoring_infos
>     op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
>   File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.Operation.monitoring_infos
>   File "apache_beam/runners/worker/operations.py", line 347, in apache_beam.runners.worker.operations.Operation.monitoring_infos
>   File "apache_beam/runners/worker/operations.py", line 386, in apache_beam.runners.worker.operations.Operation.user_monitoring_infos
>   File "apache_beam/metrics/execution.py", line 261, in apache_beam.metrics.execution.MetricsContainer.to_runner_api_monitoring_infos
>   File "apache_beam/metrics/cells.py", line 222, in apache_beam.metrics.cells.GaugeCell.to_runner_api_monitoring_info
>   File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", line 222, in int64_user_gauge
>     payload = _encode_gauge(coder, timestamp, value)
>   File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", line 397, in _encode_gauge
>     coder.get_impl().encode_to_stream(value, stream, True)
>   File "apache_beam/coders/coder_impl.py", line 690, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
>   File "apache_beam/coders/coder_impl.py", line 692, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
> TypeError: an integer is required
> {noformat}
> The transform has the following structure and errors when the lines following {{TODO}} have been uncommented:
> {code:python}
> class StatefulOperation(beam.DoFn):
>   def __init__(self, state_size_per_key_bytes, use_processing_timer=False):
>     self.state_size_per_key_bytes = state_size_per_key_bytes
>     self.str_coder = StrUtf8Coder().get_impl()
>     self.bytes_gauge = Metrics.gauge('synthetic', 'state_bytes')
>     self.elements_gauge = Metrics.gauge('synthetic', 'state_elements')
>     self.use_processing_timer = use_processing_timer
>   state_spec = userstate.BagStateSpec('state', StrUtf8Coder())
>   state_spec2 = userstate.CombiningValueStateSpec('state_size_bytes', combine_fn=sum)
>   state_spec3 = userstate.CombiningValueStateSpec('num_state_entries', combine_fn=sum)
>   event_timer_spec = userstate.TimerSpec('event_timer', beam.TimeDomain.WATERMARK)
>   processing_timer_spec = userstate.TimerSpec('proc_timer', beam.TimeDomain.REAL_TIME)
>   def process(self,
>               element,
>               timestamp=beam.DoFn.TimestampParam,
>               state=beam.DoFn.StateParam(state_spec),
>               state_num_bytes=beam.DoFn.StateParam(state_spec2),
>               state_num_entries=beam.DoFn.StateParam(state_spec3),
>               event_timer=beam.DoFn.TimerParam(event_timer_spec),
>               processing_timer=beam.DoFn.TimerParam(processing_timer_spec)):
>     # Append stringified element to state until the threshold has been reached
>     # The cleanup timer will then clean up and the process will repeat.
>     if state_num_bytes.read() <= self.state_size_per_key_bytes:
>       state_element = str(element)
>       state.add(state_element)
>       bytes_added = len(self.str_coder.encode_nested(state_element))
>       state_num_bytes.add(bytes_added)
>       state_num_entries.add(1)
>       timer = processing_timer if self.use_processing_timer else event_timer
>       # Set a timer which will clear the state if it grows too large
>       timer.set(timestamp.micros // 1000000 + 5)
>     # Metrics
>     # TODO Unfortunately buggy with timers, needs to be fixed in Beam:
>     #self.bytes_gauge.set(state_num_bytes.read())
>     #self.elements_gauge.set(state_num_entries.read())
>     yield element
>   @userstate.on_timer(event_timer_spec)
>   def on_event_timer(self,
>                      key=beam.DoFn.KeyParam,
>                      state=beam.DoFn.StateParam(state_spec),
>                      state_num_bytes=beam.DoFn.StateParam(state_spec2),
>                      state_num_entries=beam.DoFn.StateParam(state_spec3)):
>     return self.timer_callback(state, state_num_bytes, state_num_entries)
>   @userstate.on_timer(processing_timer_spec)
>   def on_processing_timer(self,
>                           state=beam.DoFn.StateParam(state_spec),
>                           state_num_bytes=beam.DoFn.StateParam(state_spec2),
>                           state_num_entries=beam.DoFn.StateParam(state_spec3)):
>     return self.timer_callback(state, state_num_bytes, state_num_entries)
>   def timer_callback(self, state, state_num_bytes, state_num_entries):
>     count = 0
>     for _ in state.read():
>       count += 1
>     state_count = state_num_entries.read()
>     if count != state_count:
>       raise Exception("Actual number of entries (%s) did not match expected (%s)" % (count, state_count))
>     # Reset state bags
>     state.clear()
>     state_num_bytes.clear()
>     state_num_entries.clear()
>     # Reset metrics
>     # TODO Unfortunately buggy with timers, needs to be fixed in Beam:
>     #self.bytes_gauge.set(0)
>     #self.elements_gauge.set(0)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)