You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Valentyn Tymofieiev (Jira)" <ji...@apache.org> on 2021/03/18 19:28:00 UTC
[jira] [Created] (BEAM-12019)
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
is flaky
Valentyn Tymofieiev created BEAM-12019:
------------------------------------------
Summary: apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics is flaky
Key: BEAM-12019
URL: https://issues.apache.org/jira/browse/BEAM-12019
Project: Beam
Issue Type: Improvement
Components: runner-flink
Reporter: Valentyn Tymofieiev
Sample error:
https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Commit/2869/testReport/junit/apache_beam.runners.portability.flink_runner_test/FlinkRunnerTestOptimized/test_flink_metrics/
{noformat}
Error Message
AssertionError: Items in the second set but not the first: 'stateful).beam.metric:statecache:hit: 11' 'stateful).beam.metric:statecache:put: 1' 'stateful).beam.metric:statecache:miss: 1' 'stateful).beam.metric:statecache:get_total: 120' 'stateful).beam.metric:statecache:size: 10' 'stateful).beam.metric:statecache:get: 12' 'stateful).beam.metric:statecache:evict: 0' 'stateful).beam.metric:statecache:capacity: 123' 'stateful).beam.metric:statecache:put_total: 10' 'stateful).beam.metric:statecache:evict_total: 0' 'counter: 110' 'stateful).beam.metric:statecache:miss_total: 10' 'stateful).beam.metric:statecache:hit_total: 110'
Stacktrace
self = <apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized testMethod=test_flink_metrics>
def test_flink_metrics(self):
"""Run a simple DoFn that increments a counter and verifies state
caching metrics. Verifies that its expected value is written to a
temporary file by the FileReporter"""
counter_name = 'elem_counter'
state_spec = userstate.BagStateSpec('state', VarIntCoder())
class DoFn(beam.DoFn):
def __init__(self):
self.counter = Metrics.counter(self.__class__, counter_name)
_LOGGER.info('counter: %s' % self.counter.metric_name)
def process(self, kv, state=beam.DoFn.StateParam(state_spec)):
# Trigger materialization
list(state.read())
state.add(1)
self.counter.inc()
options = self.create_options()
# Test only supports parallelism of 1
options._all_options['parallelism'] = 1
# Create multiple bundles to test cache metrics
options._all_options['max_bundle_size'] = 10
options._all_options['max_bundle_time_millis'] = 95130590130
experiments = options.view_as(DebugOptions).experiments or []
experiments.append('state_cache_size=123')
options.view_as(DebugOptions).experiments = experiments
with Pipeline(self.get_runner(), options) as p:
# pylint: disable=expression-not-assigned
(
p
| "create" >> beam.Create(list(range(0, 110)))
| "mapper" >> beam.Map(lambda x: (x % 10, 'val'))
| "stateful" >> beam.ParDo(DoFn()))
lines_expected = {'counter: 110'}
if options.view_as(StandardOptions).streaming:
lines_expected.update([
# Gauges for the last finished bundle
'stateful.beam.metric:statecache:capacity: 123',
'stateful.beam.metric:statecache:size: 10',
'stateful.beam.metric:statecache:get: 20',
'stateful.beam.metric:statecache:miss: 0',
'stateful.beam.metric:statecache:hit: 20',
'stateful.beam.metric:statecache:put: 0',
'stateful.beam.metric:statecache:evict: 0',
# Counters
'stateful.beam.metric:statecache:get_total: 220',
'stateful.beam.metric:statecache:miss_total: 10',
'stateful.beam.metric:statecache:hit_total: 210',
'stateful.beam.metric:statecache:put_total: 10',
'stateful.beam.metric:statecache:evict_total: 0',
])
else:
# Batch has a different processing model. All values for
# a key are processed at once.
lines_expected.update([
# Gauges
'stateful).beam.metric:statecache:capacity: 123',
# For the first key, the cache token will not be set yet.
# It's lazily initialized after first access in StateRequestHandlers
'stateful).beam.metric:statecache:size: 10',
# We have 11 here because there are 110 / 10 elements per key
'stateful).beam.metric:statecache:get: 12',
'stateful).beam.metric:statecache:miss: 1',
'stateful).beam.metric:statecache:hit: 11',
# State is flushed back once per key
'stateful).beam.metric:statecache:put: 1',
'stateful).beam.metric:statecache:evict: 0',
# Counters
'stateful).beam.metric:statecache:get_total: 120',
'stateful).beam.metric:statecache:miss_total: 10',
'stateful).beam.metric:statecache:hit_total: 110',
'stateful).beam.metric:statecache:put_total: 10',
'stateful).beam.metric:statecache:evict_total: 0',
])
lines_actual = set()
with open(self.test_metrics_path, 'r') as f:
for line in f:
for metric_str in lines_expected:
metric_name = metric_str.split()[0]
if metric_str in line:
lines_actual.add(metric_str)
elif metric_name in line:
lines_actual.add(line)
> self.assertSetEqual(lines_actual, lines_expected)
E AssertionError: Items in the second set but not the first:
E 'stateful).beam.metric:statecache:hit: 11'
E 'stateful).beam.metric:statecache:put: 1'
E 'stateful).beam.metric:statecache:miss: 1'
E 'stateful).beam.metric:statecache:get_total: 120'
E 'stateful).beam.metric:statecache:size: 10'
E 'stateful).beam.metric:statecache:get: 12'
E 'stateful).beam.metric:statecache:evict: 0'
E 'stateful).beam.metric:statecache:capacity: 123'
E 'stateful).beam.metric:statecache:put_total: 10'
E 'stateful).beam.metric:statecache:evict_total: 0'
E 'counter: 110'
E 'stateful).beam.metric:statecache:miss_total: 10'
E 'stateful).beam.metric:statecache:hit_total: 110'
apache_beam/runners/portability/flink_runner_test.py:390: AssertionError
{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)