You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Valentyn Tymofieiev (Jira)" <ji...@apache.org> on 2021/03/18 19:28:00 UTC

[jira] [Created] (BEAM-12019) apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics is flaky

Valentyn Tymofieiev created BEAM-12019:
------------------------------------------

             Summary: apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics is flaky
                 Key: BEAM-12019
                 URL: https://issues.apache.org/jira/browse/BEAM-12019
             Project: Beam
          Issue Type: Improvement
          Components: runner-flink
            Reporter: Valentyn Tymofieiev


Sample error:
https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Commit/2869/testReport/junit/apache_beam.runners.portability.flink_runner_test/FlinkRunnerTestOptimized/test_flink_metrics/

{noformat}
Error Message
AssertionError: Items in the second set but not the first: 'stateful).beam.metric:statecache:hit: 11' 'stateful).beam.metric:statecache:put: 1' 'stateful).beam.metric:statecache:miss: 1' 'stateful).beam.metric:statecache:get_total: 120' 'stateful).beam.metric:statecache:size: 10' 'stateful).beam.metric:statecache:get: 12' 'stateful).beam.metric:statecache:evict: 0' 'stateful).beam.metric:statecache:capacity: 123' 'stateful).beam.metric:statecache:put_total: 10' 'stateful).beam.metric:statecache:evict_total: 0' 'counter: 110' 'stateful).beam.metric:statecache:miss_total: 10' 'stateful).beam.metric:statecache:hit_total: 110'
Stacktrace
self = <apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized testMethod=test_flink_metrics>

    def test_flink_metrics(self):
      """Run a simple DoFn that increments a counter and verifies state
      caching metrics. Verifies that its expected value is written to a
      temporary file by the FileReporter"""
    
      counter_name = 'elem_counter'
      state_spec = userstate.BagStateSpec('state', VarIntCoder())
    
      class DoFn(beam.DoFn):
        def __init__(self):
          self.counter = Metrics.counter(self.__class__, counter_name)
          _LOGGER.info('counter: %s' % self.counter.metric_name)
    
        def process(self, kv, state=beam.DoFn.StateParam(state_spec)):
          # Trigger materialization
          list(state.read())
          state.add(1)
          self.counter.inc()
    
      options = self.create_options()
      # Test only supports parallelism of 1
      options._all_options['parallelism'] = 1
      # Create multiple bundles to test cache metrics
      options._all_options['max_bundle_size'] = 10
      options._all_options['max_bundle_time_millis'] = 95130590130
      experiments = options.view_as(DebugOptions).experiments or []
      experiments.append('state_cache_size=123')
      options.view_as(DebugOptions).experiments = experiments
      with Pipeline(self.get_runner(), options) as p:
        # pylint: disable=expression-not-assigned
        (
            p
            | "create" >> beam.Create(list(range(0, 110)))
            | "mapper" >> beam.Map(lambda x: (x % 10, 'val'))
            | "stateful" >> beam.ParDo(DoFn()))
    
      lines_expected = {'counter: 110'}
      if options.view_as(StandardOptions).streaming:
        lines_expected.update([
            # Gauges for the last finished bundle
            'stateful.beam.metric:statecache:capacity: 123',
            'stateful.beam.metric:statecache:size: 10',
            'stateful.beam.metric:statecache:get: 20',
            'stateful.beam.metric:statecache:miss: 0',
            'stateful.beam.metric:statecache:hit: 20',
            'stateful.beam.metric:statecache:put: 0',
            'stateful.beam.metric:statecache:evict: 0',
            # Counters
            'stateful.beam.metric:statecache:get_total: 220',
            'stateful.beam.metric:statecache:miss_total: 10',
            'stateful.beam.metric:statecache:hit_total: 210',
            'stateful.beam.metric:statecache:put_total: 10',
            'stateful.beam.metric:statecache:evict_total: 0',
        ])
      else:
        # Batch has a different processing model. All values for
        # a key are processed at once.
        lines_expected.update([
            # Gauges
            'stateful).beam.metric:statecache:capacity: 123',
            # For the first key, the cache token will not be set yet.
            # It's lazily initialized after first access in StateRequestHandlers
            'stateful).beam.metric:statecache:size: 10',
            # We have 11 here because there are 110 / 10 elements per key
            'stateful).beam.metric:statecache:get: 12',
            'stateful).beam.metric:statecache:miss: 1',
            'stateful).beam.metric:statecache:hit: 11',
            # State is flushed back once per key
            'stateful).beam.metric:statecache:put: 1',
            'stateful).beam.metric:statecache:evict: 0',
            # Counters
            'stateful).beam.metric:statecache:get_total: 120',
            'stateful).beam.metric:statecache:miss_total: 10',
            'stateful).beam.metric:statecache:hit_total: 110',
            'stateful).beam.metric:statecache:put_total: 10',
            'stateful).beam.metric:statecache:evict_total: 0',
        ])
      lines_actual = set()
      with open(self.test_metrics_path, 'r') as f:
        for line in f:
          for metric_str in lines_expected:
            metric_name = metric_str.split()[0]
            if metric_str in line:
              lines_actual.add(metric_str)
            elif metric_name in line:
              lines_actual.add(line)
>     self.assertSetEqual(lines_actual, lines_expected)
E     AssertionError: Items in the second set but not the first:
E     'stateful).beam.metric:statecache:hit: 11'
E     'stateful).beam.metric:statecache:put: 1'
E     'stateful).beam.metric:statecache:miss: 1'
E     'stateful).beam.metric:statecache:get_total: 120'
E     'stateful).beam.metric:statecache:size: 10'
E     'stateful).beam.metric:statecache:get: 12'
E     'stateful).beam.metric:statecache:evict: 0'
E     'stateful).beam.metric:statecache:capacity: 123'
E     'stateful).beam.metric:statecache:put_total: 10'
E     'stateful).beam.metric:statecache:evict_total: 0'
E     'counter: 110'
E     'stateful).beam.metric:statecache:miss_total: 10'
E     'stateful).beam.metric:statecache:hit_total: 110'

apache_beam/runners/portability/flink_runner_test.py:390: AssertionError

{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)