You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/03/22 17:40:00 UTC
[jira] [Updated] (BEAM-12019)
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
is flaky
[ https://issues.apache.org/jira/browse/BEAM-12019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-12019:
-----------------------------------
Component/s: test-failures
> apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics is flaky
> ------------------------------------------------------------------------------------------------------
>
> Key: BEAM-12019
> URL: https://issues.apache.org/jira/browse/BEAM-12019
> Project: Beam
> Issue Type: Bug
> Components: flake, runner-flink, test-failures
> Reporter: Valentyn Tymofieiev
> Priority: P1
>
> Sample error:
> https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Commit/2869/testReport/junit/apache_beam.runners.portability.flink_runner_test/FlinkRunnerTestOptimized/test_flink_metrics/
> {noformat}
> Error Message
> AssertionError: Items in the second set but not the first: 'stateful).beam.metric:statecache:hit: 11' 'stateful).beam.metric:statecache:put: 1' 'stateful).beam.metric:statecache:miss: 1' 'stateful).beam.metric:statecache:get_total: 120' 'stateful).beam.metric:statecache:size: 10' 'stateful).beam.metric:statecache:get: 12' 'stateful).beam.metric:statecache:evict: 0' 'stateful).beam.metric:statecache:capacity: 123' 'stateful).beam.metric:statecache:put_total: 10' 'stateful).beam.metric:statecache:evict_total: 0' 'counter: 110' 'stateful).beam.metric:statecache:miss_total: 10' 'stateful).beam.metric:statecache:hit_total: 110'
> Stacktrace
> self = <apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized testMethod=test_flink_metrics>
> def test_flink_metrics(self):
> """Run a simple DoFn that increments a counter and verifies state
> caching metrics. Verifies that its expected value is written to a
> temporary file by the FileReporter"""
>
> counter_name = 'elem_counter'
> state_spec = userstate.BagStateSpec('state', VarIntCoder())
>
> class DoFn(beam.DoFn):
> def __init__(self):
> self.counter = Metrics.counter(self.__class__, counter_name)
> _LOGGER.info('counter: %s' % self.counter.metric_name)
>
> def process(self, kv, state=beam.DoFn.StateParam(state_spec)):
> # Trigger materialization
> list(state.read())
> state.add(1)
> self.counter.inc()
>
> options = self.create_options()
> # Test only supports parallelism of 1
> options._all_options['parallelism'] = 1
> # Create multiple bundles to test cache metrics
> options._all_options['max_bundle_size'] = 10
> options._all_options['max_bundle_time_millis'] = 95130590130
> experiments = options.view_as(DebugOptions).experiments or []
> experiments.append('state_cache_size=123')
> options.view_as(DebugOptions).experiments = experiments
> with Pipeline(self.get_runner(), options) as p:
> # pylint: disable=expression-not-assigned
> (
> p
> | "create" >> beam.Create(list(range(0, 110)))
> | "mapper" >> beam.Map(lambda x: (x % 10, 'val'))
> | "stateful" >> beam.ParDo(DoFn()))
>
> lines_expected = {'counter: 110'}
> if options.view_as(StandardOptions).streaming:
> lines_expected.update([
> # Gauges for the last finished bundle
> 'stateful.beam.metric:statecache:capacity: 123',
> 'stateful.beam.metric:statecache:size: 10',
> 'stateful.beam.metric:statecache:get: 20',
> 'stateful.beam.metric:statecache:miss: 0',
> 'stateful.beam.metric:statecache:hit: 20',
> 'stateful.beam.metric:statecache:put: 0',
> 'stateful.beam.metric:statecache:evict: 0',
> # Counters
> 'stateful.beam.metric:statecache:get_total: 220',
> 'stateful.beam.metric:statecache:miss_total: 10',
> 'stateful.beam.metric:statecache:hit_total: 210',
> 'stateful.beam.metric:statecache:put_total: 10',
> 'stateful.beam.metric:statecache:evict_total: 0',
> ])
> else:
> # Batch has a different processing model. All values for
> # a key are processed at once.
> lines_expected.update([
> # Gauges
> 'stateful).beam.metric:statecache:capacity: 123',
> # For the first key, the cache token will not be set yet.
> # It's lazily initialized after first access in StateRequestHandlers
> 'stateful).beam.metric:statecache:size: 10',
> # We have 11 here because there are 110 / 10 elements per key
> 'stateful).beam.metric:statecache:get: 12',
> 'stateful).beam.metric:statecache:miss: 1',
> 'stateful).beam.metric:statecache:hit: 11',
> # State is flushed back once per key
> 'stateful).beam.metric:statecache:put: 1',
> 'stateful).beam.metric:statecache:evict: 0',
> # Counters
> 'stateful).beam.metric:statecache:get_total: 120',
> 'stateful).beam.metric:statecache:miss_total: 10',
> 'stateful).beam.metric:statecache:hit_total: 110',
> 'stateful).beam.metric:statecache:put_total: 10',
> 'stateful).beam.metric:statecache:evict_total: 0',
> ])
> lines_actual = set()
> with open(self.test_metrics_path, 'r') as f:
> for line in f:
> for metric_str in lines_expected:
> metric_name = metric_str.split()[0]
> if metric_str in line:
> lines_actual.add(metric_str)
> elif metric_name in line:
> lines_actual.add(line)
> > self.assertSetEqual(lines_actual, lines_expected)
> E AssertionError: Items in the second set but not the first:
> E 'stateful).beam.metric:statecache:hit: 11'
> E 'stateful).beam.metric:statecache:put: 1'
> E 'stateful).beam.metric:statecache:miss: 1'
> E 'stateful).beam.metric:statecache:get_total: 120'
> E 'stateful).beam.metric:statecache:size: 10'
> E 'stateful).beam.metric:statecache:get: 12'
> E 'stateful).beam.metric:statecache:evict: 0'
> E 'stateful).beam.metric:statecache:capacity: 123'
> E 'stateful).beam.metric:statecache:put_total: 10'
> E 'stateful).beam.metric:statecache:evict_total: 0'
> E 'counter: 110'
> E 'stateful).beam.metric:statecache:miss_total: 10'
> E 'stateful).beam.metric:statecache:hit_total: 110'
> apache_beam/runners/portability/flink_runner_test.py:390: AssertionError
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)