You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Valentyn Tymofieiev (Jira)" <ji...@apache.org> on 2021/03/18 19:29:00 UTC

[jira] [Commented] (BEAM-12019) apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics is flaky

    [ https://issues.apache.org/jira/browse/BEAM-12019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304381#comment-17304381 ] 

Valentyn Tymofieiev commented on BEAM-12019:
--------------------------------------------

cc: [~ibzib]

> apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics is flaky
> ------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12019
>                 URL: https://issues.apache.org/jira/browse/BEAM-12019
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Valentyn Tymofieiev
>            Priority: P1
>
> Sample error:
> https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Commit/2869/testReport/junit/apache_beam.runners.portability.flink_runner_test/FlinkRunnerTestOptimized/test_flink_metrics/
> {noformat}
> Error Message
> AssertionError: Items in the second set but not the first: 'stateful).beam.metric:statecache:hit: 11' 'stateful).beam.metric:statecache:put: 1' 'stateful).beam.metric:statecache:miss: 1' 'stateful).beam.metric:statecache:get_total: 120' 'stateful).beam.metric:statecache:size: 10' 'stateful).beam.metric:statecache:get: 12' 'stateful).beam.metric:statecache:evict: 0' 'stateful).beam.metric:statecache:capacity: 123' 'stateful).beam.metric:statecache:put_total: 10' 'stateful).beam.metric:statecache:evict_total: 0' 'counter: 110' 'stateful).beam.metric:statecache:miss_total: 10' 'stateful).beam.metric:statecache:hit_total: 110'
> Stacktrace
> self = <apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized testMethod=test_flink_metrics>
>     def test_flink_metrics(self):
>       """Run a simple DoFn that increments a counter and verifies state
>       caching metrics. Verifies that its expected value is written to a
>       temporary file by the FileReporter"""
>     
>       counter_name = 'elem_counter'
>       state_spec = userstate.BagStateSpec('state', VarIntCoder())
>     
>       class DoFn(beam.DoFn):
>         def __init__(self):
>           self.counter = Metrics.counter(self.__class__, counter_name)
>           _LOGGER.info('counter: %s' % self.counter.metric_name)
>     
>         def process(self, kv, state=beam.DoFn.StateParam(state_spec)):
>           # Trigger materialization
>           list(state.read())
>           state.add(1)
>           self.counter.inc()
>     
>       options = self.create_options()
>       # Test only supports parallelism of 1
>       options._all_options['parallelism'] = 1
>       # Create multiple bundles to test cache metrics
>       options._all_options['max_bundle_size'] = 10
>       options._all_options['max_bundle_time_millis'] = 95130590130
>       experiments = options.view_as(DebugOptions).experiments or []
>       experiments.append('state_cache_size=123')
>       options.view_as(DebugOptions).experiments = experiments
>       with Pipeline(self.get_runner(), options) as p:
>         # pylint: disable=expression-not-assigned
>         (
>             p
>             | "create" >> beam.Create(list(range(0, 110)))
>             | "mapper" >> beam.Map(lambda x: (x % 10, 'val'))
>             | "stateful" >> beam.ParDo(DoFn()))
>     
>       lines_expected = {'counter: 110'}
>       if options.view_as(StandardOptions).streaming:
>         lines_expected.update([
>             # Gauges for the last finished bundle
>             'stateful.beam.metric:statecache:capacity: 123',
>             'stateful.beam.metric:statecache:size: 10',
>             'stateful.beam.metric:statecache:get: 20',
>             'stateful.beam.metric:statecache:miss: 0',
>             'stateful.beam.metric:statecache:hit: 20',
>             'stateful.beam.metric:statecache:put: 0',
>             'stateful.beam.metric:statecache:evict: 0',
>             # Counters
>             'stateful.beam.metric:statecache:get_total: 220',
>             'stateful.beam.metric:statecache:miss_total: 10',
>             'stateful.beam.metric:statecache:hit_total: 210',
>             'stateful.beam.metric:statecache:put_total: 10',
>             'stateful.beam.metric:statecache:evict_total: 0',
>         ])
>       else:
>         # Batch has a different processing model. All values for
>         # a key are processed at once.
>         lines_expected.update([
>             # Gauges
>             'stateful).beam.metric:statecache:capacity: 123',
>             # For the first key, the cache token will not be set yet.
>             # It's lazily initialized after first access in StateRequestHandlers
>             'stateful).beam.metric:statecache:size: 10',
>             # We have 11 here because there are 110 / 10 elements per key
>             'stateful).beam.metric:statecache:get: 12',
>             'stateful).beam.metric:statecache:miss: 1',
>             'stateful).beam.metric:statecache:hit: 11',
>             # State is flushed back once per key
>             'stateful).beam.metric:statecache:put: 1',
>             'stateful).beam.metric:statecache:evict: 0',
>             # Counters
>             'stateful).beam.metric:statecache:get_total: 120',
>             'stateful).beam.metric:statecache:miss_total: 10',
>             'stateful).beam.metric:statecache:hit_total: 110',
>             'stateful).beam.metric:statecache:put_total: 10',
>             'stateful).beam.metric:statecache:evict_total: 0',
>         ])
>       lines_actual = set()
>       with open(self.test_metrics_path, 'r') as f:
>         for line in f:
>           for metric_str in lines_expected:
>             metric_name = metric_str.split()[0]
>             if metric_str in line:
>               lines_actual.add(metric_str)
>             elif metric_name in line:
>               lines_actual.add(line)
> >     self.assertSetEqual(lines_actual, lines_expected)
> E     AssertionError: Items in the second set but not the first:
> E     'stateful).beam.metric:statecache:hit: 11'
> E     'stateful).beam.metric:statecache:put: 1'
> E     'stateful).beam.metric:statecache:miss: 1'
> E     'stateful).beam.metric:statecache:get_total: 120'
> E     'stateful).beam.metric:statecache:size: 10'
> E     'stateful).beam.metric:statecache:get: 12'
> E     'stateful).beam.metric:statecache:evict: 0'
> E     'stateful).beam.metric:statecache:capacity: 123'
> E     'stateful).beam.metric:statecache:put_total: 10'
> E     'stateful).beam.metric:statecache:evict_total: 0'
> E     'counter: 110'
> E     'stateful).beam.metric:statecache:miss_total: 10'
> E     'stateful).beam.metric:statecache:hit_total: 110'
> apache_beam/runners/portability/flink_runner_test.py:390: AssertionError
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)