You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Maximilian Michels (Jira)" <ji...@apache.org> on 2020/09/02 08:43:00 UTC
[jira] [Updated] (BEAM-10848) Gauge metrics error when setting
timers
[ https://issues.apache.org/jira/browse/BEAM-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Maximilian Michels updated BEAM-10848:
--------------------------------------
Description:
Gauges are affected by setting timers leading to {{None}} values:
{noformat}
ERROR:apache_beam.runners.worker.sdk_worker:Error processing instruction 147. Original traceback is
Traceback (most recent call last):
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
response = task()
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 516, in process_bundle
monitoring_infos = bundle_processor.monitoring_infos()
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1107, in monitoring_infos
op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.Operation.monitoring_infos
File "apache_beam/runners/worker/operations.py", line 347, in apache_beam.runners.worker.operations.Operation.monitoring_infos
File "apache_beam/runners/worker/operations.py", line 386, in apache_beam.runners.worker.operations.Operation.user_monitoring_infos
File "apache_beam/metrics/execution.py", line 261, in apache_beam.metrics.execution.MetricsContainer.to_runner_api_monitoring_infos
File "apache_beam/metrics/cells.py", line 222, in apache_beam.metrics.cells.GaugeCell.to_runner_api_monitoring_info
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", line 222, in int64_user_gauge
payload = _encode_gauge(coder, timestamp, value)
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", line 397, in _encode_gauge
coder.get_impl().encode_to_stream(value, stream, True)
File "apache_beam/coders/coder_impl.py", line 690, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 692, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
TypeError: an integer is required
{noformat}
The transform has the following structure and errors when the lines following {{TODO}} have been uncommented:
{code:python}
class StatefulOperation(beam.DoFn):
def __init__(self, state_size_per_key_bytes, use_processing_timer=False):
self.state_size_per_key_bytes = state_size_per_key_bytes
self.str_coder = StrUtf8Coder().get_impl()
self.bytes_gauge = Metrics.gauge('synthetic', 'state_bytes')
self.elements_gauge = Metrics.gauge('synthetic', 'state_elements')
self.use_processing_timer = use_processing_timer
state_spec = userstate.BagStateSpec('state', StrUtf8Coder())
state_spec2 = userstate.CombiningValueStateSpec('state_size_bytes', combine_fn=sum)
state_spec3 = userstate.CombiningValueStateSpec('num_state_entries', combine_fn=sum)
event_timer_spec = userstate.TimerSpec('event_timer', beam.TimeDomain.WATERMARK)
processing_timer_spec = userstate.TimerSpec('proc_timer', beam.TimeDomain.REAL_TIME)
def process(self,
element,
timestamp=beam.DoFn.TimestampParam,
state=beam.DoFn.StateParam(state_spec),
state_num_bytes=beam.DoFn.StateParam(state_spec2),
state_num_entries=beam.DoFn.StateParam(state_spec3),
event_timer=beam.DoFn.TimerParam(event_timer_spec),
processing_timer=beam.DoFn.TimerParam(processing_timer_spec)):
# Append stringified element to state until the threshold has been reached
# The cleanup timer will then clean up and the process will repeat.
if state_num_bytes.read() <= self.state_size_per_key_bytes:
state_element = str(element)
state.add(state_element)
bytes_added = len(self.str_coder.encode_nested(state_element))
state_num_bytes.add(bytes_added)
state_num_entries.add(1)
timer = processing_timer if self.use_processing_timer else event_timer
# Set a timer which will clear the state if it grows too large
timer.set(timestamp.micros // 1000000 + 5)
# Metrics
# TODO Unfortunately buggy with timers, needs to be fixed in Beam:
#self.bytes_gauge.set(state_num_bytes.read())
#self.elements_gauge.set(state_num_entries.read())
yield element
@userstate.on_timer(event_timer_spec)
def on_event_timer(self,
key=beam.DoFn.KeyParam,
state=beam.DoFn.StateParam(state_spec),
state_num_bytes=beam.DoFn.StateParam(state_spec2),
state_num_entries=beam.DoFn.StateParam(state_spec3)):
return self.timer_callback(state, state_num_bytes, state_num_entries)
@userstate.on_timer(processing_timer_spec)
def on_processing_timer(self,
state=beam.DoFn.StateParam(state_spec),
state_num_bytes=beam.DoFn.StateParam(state_spec2),
state_num_entries=beam.DoFn.StateParam(state_spec3)):
return self.timer_callback(state, state_num_bytes, state_num_entries)
def timer_callback(self, state, state_num_bytes, state_num_entries):
count = 0
for _ in state.read():
count += 1
state_count = state_num_entries.read()
if count != state_count:
raise Exception("Actual number of entries (%s) did not match expected (%s)" % (count, state_count))
# Reset state bags
state.clear()
state_num_bytes.clear()
state_num_entries.clear()
# Reset metrics
# TODO Unfortunately buggy with timers, needs to be fixed in Beam:
#self.bytes_gauge.set(0)
#self.elements_gauge.set(0)
{code}
was:
Gauges are affected by setting timers leading to {{None}} values:
{noformat}
ERROR:apache_beam.runners.worker.sdk_worker:Error processing instruction 147. Original traceback is
Traceback (most recent call last):
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
response = task()
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 516, in process_bundle
monitoring_infos = bundle_processor.monitoring_infos()
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1107, in monitoring_infos
op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.Operation.monitoring_infos
File "apache_beam/runners/worker/operations.py", line 347, in apache_beam.runners.worker.operations.Operation.monitoring_infos
File "apache_beam/runners/worker/operations.py", line 386, in apache_beam.runners.worker.operations.Operation.user_monitoring_infos
File "apache_beam/metrics/execution.py", line 261, in apache_beam.metrics.execution.MetricsContainer.to_runner_api_monitoring_infos
File "apache_beam/metrics/cells.py", line 222, in apache_beam.metrics.cells.GaugeCell.to_runner_api_monitoring_info
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", line 222, in int64_user_gauge
payload = _encode_gauge(coder, timestamp, value)
File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", line 397, in _encode_gauge
coder.get_impl().encode_to_stream(value, stream, True)
File "apache_beam/coders/coder_impl.py", line 690, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 692, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
TypeError: an integer is required
{noformat}
The application has the following structure:
{code:python}
class StatefulOperation(beam.DoFn):
def __init__(self, state_size_per_key_bytes, use_processing_timer=False):
self.state_size_per_key_bytes = state_size_per_key_bytes
self.str_coder = StrUtf8Coder().get_impl()
self.bytes_gauge = Metrics.gauge('synthetic', 'state_bytes')
self.elements_gauge = Metrics.gauge('synthetic', 'state_elements')
self.use_processing_timer = use_processing_timer
state_spec = userstate.BagStateSpec('state', StrUtf8Coder())
state_spec2 = userstate.CombiningValueStateSpec('state_size_bytes', combine_fn=sum)
state_spec3 = userstate.CombiningValueStateSpec('num_state_entries', combine_fn=sum)
event_timer_spec = userstate.TimerSpec('event_timer', beam.TimeDomain.WATERMARK)
processing_timer_spec = userstate.TimerSpec('proc_timer', beam.TimeDomain.REAL_TIME)
def process(self,
element,
timestamp=beam.DoFn.TimestampParam,
state=beam.DoFn.StateParam(state_spec),
state_num_bytes=beam.DoFn.StateParam(state_spec2),
state_num_entries=beam.DoFn.StateParam(state_spec3),
event_timer=beam.DoFn.TimerParam(event_timer_spec),
processing_timer=beam.DoFn.TimerParam(processing_timer_spec)):
# Append stringified element to state until the threshold has been reached
# The cleanup timer will then clean up and the process will repeat.
if state_num_bytes.read() <= self.state_size_per_key_bytes:
state_element = str(element)
state.add(state_element)
bytes_added = len(self.str_coder.encode_nested(state_element))
state_num_bytes.add(bytes_added)
state_num_entries.add(1)
timer = processing_timer if self.use_processing_timer else event_timer
# Set a timer which will clear the state if it grows too large
timer.set(timestamp.micros // 1000000 + 5)
# Metrics
# TODO Unfortunately buggy with timers, needs to be fixed in Beam:
#self.bytes_gauge.set(state_num_bytes.read())
#self.elements_gauge.set(state_num_entries.read())
yield element
@userstate.on_timer(event_timer_spec)
def on_event_timer(self,
key=beam.DoFn.KeyParam,
state=beam.DoFn.StateParam(state_spec),
state_num_bytes=beam.DoFn.StateParam(state_spec2),
state_num_entries=beam.DoFn.StateParam(state_spec3)):
return self.timer_callback(state, state_num_bytes, state_num_entries)
@userstate.on_timer(processing_timer_spec)
def on_processing_timer(self,
state=beam.DoFn.StateParam(state_spec),
state_num_bytes=beam.DoFn.StateParam(state_spec2),
state_num_entries=beam.DoFn.StateParam(state_spec3)):
return self.timer_callback(state, state_num_bytes, state_num_entries)
def timer_callback(self, state, state_num_bytes, state_num_entries):
count = 0
for _ in state.read():
count += 1
state_count = state_num_entries.read()
if count != state_count:
raise Exception("Actual number of entries (%s) did not match expected (%s)" % (count, state_count))
# Reset state bags
state.clear()
state_num_bytes.clear()
state_num_entries.clear()
# Reset metrics
# TODO Unfortunately buggy with timers, needs to be fixed in Beam:
#self.bytes_gauge.set(0)
#self.elements_gauge.set(0)
{code}
> Gauge metrics error when setting timers
> ---------------------------------------
>
> Key: BEAM-10848
> URL: https://issues.apache.org/jira/browse/BEAM-10848
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Reporter: Maximilian Michels
> Priority: P2
>
> Gauges are affected by setting timers leading to {{None}} values:
> {noformat}
> ERROR:apache_beam.runners.worker.sdk_worker:Error processing instruction 147. Original traceback is
> Traceback (most recent call last):
> File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
> response = task()
> File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
> lambda: self.create_worker().do_instruction(request), request)
> File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
> getattr(request, request_type), request.instruction_id)
> File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 516, in process_bundle
> monitoring_infos = bundle_processor.monitoring_infos()
> File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1107, in monitoring_infos
> op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
> File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.Operation.monitoring_infos
> File "apache_beam/runners/worker/operations.py", line 347, in apache_beam.runners.worker.operations.Operation.monitoring_infos
> File "apache_beam/runners/worker/operations.py", line 386, in apache_beam.runners.worker.operations.Operation.user_monitoring_infos
> File "apache_beam/metrics/execution.py", line 261, in apache_beam.metrics.execution.MetricsContainer.to_runner_api_monitoring_infos
> File "apache_beam/metrics/cells.py", line 222, in apache_beam.metrics.cells.GaugeCell.to_runner_api_monitoring_info
> File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", line 222, in int64_user_gauge
> payload = _encode_gauge(coder, timestamp, value)
> File "/Users/max/Consulting/Lyft/dev/streamperfbench/beamperfk8s/venv/lib/python3.7/site-packages/apache_beam/metrics/monitoring_infos.py", line 397, in _encode_gauge
> coder.get_impl().encode_to_stream(value, stream, True)
> File "apache_beam/coders/coder_impl.py", line 690, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
> File "apache_beam/coders/coder_impl.py", line 692, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
> TypeError: an integer is required
> {noformat}
> The transform has the following structure and errors when the lines following {{TODO}} have been uncommented:
> {code:python}
> class StatefulOperation(beam.DoFn):
> def __init__(self, state_size_per_key_bytes, use_processing_timer=False):
> self.state_size_per_key_bytes = state_size_per_key_bytes
> self.str_coder = StrUtf8Coder().get_impl()
> self.bytes_gauge = Metrics.gauge('synthetic', 'state_bytes')
> self.elements_gauge = Metrics.gauge('synthetic', 'state_elements')
> self.use_processing_timer = use_processing_timer
> state_spec = userstate.BagStateSpec('state', StrUtf8Coder())
> state_spec2 = userstate.CombiningValueStateSpec('state_size_bytes', combine_fn=sum)
> state_spec3 = userstate.CombiningValueStateSpec('num_state_entries', combine_fn=sum)
> event_timer_spec = userstate.TimerSpec('event_timer', beam.TimeDomain.WATERMARK)
> processing_timer_spec = userstate.TimerSpec('proc_timer', beam.TimeDomain.REAL_TIME)
> def process(self,
> element,
> timestamp=beam.DoFn.TimestampParam,
> state=beam.DoFn.StateParam(state_spec),
> state_num_bytes=beam.DoFn.StateParam(state_spec2),
> state_num_entries=beam.DoFn.StateParam(state_spec3),
> event_timer=beam.DoFn.TimerParam(event_timer_spec),
> processing_timer=beam.DoFn.TimerParam(processing_timer_spec)):
> # Append stringified element to state until the threshold has been reached
> # The cleanup timer will then clean up and the process will repeat.
> if state_num_bytes.read() <= self.state_size_per_key_bytes:
> state_element = str(element)
> state.add(state_element)
> bytes_added = len(self.str_coder.encode_nested(state_element))
> state_num_bytes.add(bytes_added)
> state_num_entries.add(1)
> timer = processing_timer if self.use_processing_timer else event_timer
> # Set a timer which will clear the state if it grows too large
> timer.set(timestamp.micros // 1000000 + 5)
> # Metrics
> # TODO Unfortunately buggy with timers, needs to be fixed in Beam:
> #self.bytes_gauge.set(state_num_bytes.read())
> #self.elements_gauge.set(state_num_entries.read())
> yield element
> @userstate.on_timer(event_timer_spec)
> def on_event_timer(self,
> key=beam.DoFn.KeyParam,
> state=beam.DoFn.StateParam(state_spec),
> state_num_bytes=beam.DoFn.StateParam(state_spec2),
> state_num_entries=beam.DoFn.StateParam(state_spec3)):
> return self.timer_callback(state, state_num_bytes, state_num_entries)
> @userstate.on_timer(processing_timer_spec)
> def on_processing_timer(self,
> state=beam.DoFn.StateParam(state_spec),
> state_num_bytes=beam.DoFn.StateParam(state_spec2),
> state_num_entries=beam.DoFn.StateParam(state_spec3)):
> return self.timer_callback(state, state_num_bytes, state_num_entries)
> def timer_callback(self, state, state_num_bytes, state_num_entries):
> count = 0
> for _ in state.read():
> count += 1
> state_count = state_num_entries.read()
> if count != state_count:
> raise Exception("Actual number of entries (%s) did not match expected (%s)" % (count, state_count))
> # Reset state bags
> state.clear()
> state_num_bytes.clear()
> state_num_entries.clear()
> # Reset metrics
> # TODO Unfortunately buggy with timers, needs to be fixed in Beam:
> #self.bytes_gauge.set(0)
> #self.elements_gauge.set(0)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)