You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2022/01/13 22:07:00 UTC

[jira] [Updated] (BEAM-13234) Flake in StreamingWordCountIT.test_streaming_wordcount_it

     [ https://issues.apache.org/jira/browse/BEAM-13234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-13234:
-----------------------------------
    Status: Open  (was: Triage Needed)

> Flake in StreamingWordCountIT.test_streaming_wordcount_it
> ---------------------------------------------------------
>
>                 Key: BEAM-13234
>                 URL: https://issues.apache.org/jira/browse/BEAM-13234
>             Project: Beam
>          Issue Type: Bug
>          Components: test-failures
>            Reporter: Valentyn Tymofieiev
>            Priority: P1
>              Labels: flake
>
> https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/20516/consoleFull
> {noformat}
> 22:50:10 =================================== FAILURES ===================================
> 22:50:10 _______________ StreamingWordCountIT.test_streaming_wordcount_it _______________
> 22:50:10 [gw0] linux -- Python 3.7.10 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/build/gradleenv/-1734967052/bin/python3.7
> 22:50:10 
> 22:50:10 self = <apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT testMethod=test_streaming_wordcount_it>
> 22:50:10 
> 22:50:10     @pytest.mark.it_postcommit
> 22:50:10     def test_streaming_wordcount_it(self):
> 22:50:10       # Build expected dataset.
> 22:50:10       expected_msg = [('%d: 1' % num).encode('utf-8')
> 22:50:10                       for num in range(DEFAULT_INPUT_NUMBERS)]
> 22:50:10     
> 22:50:10       # Set extra options to the pipeline for test purpose
> 22:50:10       state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
> 22:50:10       pubsub_msg_verifier = PubSubMessageMatcher(
> 22:50:10           self.project, self.output_sub.name, expected_msg, timeout=400)
> 22:50:10       extra_opts = {
> 22:50:10           'input_subscription': self.input_sub.name,
> 22:50:10           'output_topic': self.output_topic.name,
> 22:50:10           'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
> 22:50:10           'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)
> 22:50:10       }
> 22:50:10     
> 22:50:10       # Generate input data and inject to PubSub.
> 22:50:10       self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
> 22:50:10     
> 22:50:10       # Get pipeline options from command argument: --test-pipeline-options,
> 22:50:10       # and start pipeline job by calling pipeline main function.
> 22:50:10       streaming_wordcount.run(
> 22:50:10           self.test_pipeline.get_full_options_as_args(**extra_opts),
> 22:50:10 >         save_main_session=False)
> 22:50:10 
> 22:50:10 apache_beam/examples/streaming_wordcount_it_test.py:104: 
> 22:50:10 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> 22:50:10 apache_beam/examples/streaming_wordcount.py:103: in run
> 22:50:10     output | beam.io.WriteToPubSub(known_args.output_topic)
> 22:50:10 apache_beam/pipeline.py:596: in __exit__
> 22:50:10     self.result = self.run()
> 22:50:10 apache_beam/pipeline.py:573: in run
> 22:50:10     return self.runner.run_pipeline(self, self._options)
> 22:50:10 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> 22:50:10 
> 22:50:10 self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner object at 0x7f1ac01efc90>
> 22:50:10 pipeline = <apache_beam.pipeline.Pipeline object at 0x7f1afd515190>
> 22:50:10 options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7f1ac0298490>
> 22:50:10 
> 22:50:10     def run_pipeline(self, pipeline, options):
> 22:50:10       """Execute test pipeline and verify test matcher"""
> 22:50:10       test_options = options.view_as(TestOptions)
> 22:50:10       on_success_matcher = test_options.on_success_matcher
> 22:50:10       wait_duration = test_options.wait_until_finish_duration
> 22:50:10       is_streaming = options.view_as(StandardOptions).streaming
> 22:50:10     
> 22:50:10       # [BEAM-1889] Do not send this to remote workers also, there is no need to
> 22:50:10       # send this option to remote executors.
> 22:50:10       test_options.on_success_matcher = None
> 22:50:10     
> 22:50:10       self.result = super().run_pipeline(pipeline, options)
> 22:50:10       if self.result.has_job:
> 22:50:10         # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
> 22:50:10         # in some cases.
> 22:50:10         print('Worker logs: %s' % self.build_console_url(options))
> 22:50:10     
> 22:50:10       try:
> 22:50:10         self.wait_until_in_state(PipelineState.RUNNING)
> 22:50:10     
> 22:50:10         if is_streaming and not wait_duration:
> 22:50:10           _LOGGER.warning('Waiting indefinitely for streaming job.')
> 22:50:10         self.result.wait_until_finish(duration=wait_duration)
> 22:50:10     
> 22:50:10         if on_success_matcher:
> 22:50:10           from hamcrest import assert_that as hc_assert_that
> 22:50:10 >         hc_assert_that(self.result, pickler.loads(on_success_matcher))
> 22:50:10 E         AssertionError: 
> 22:50:10 E         Expected: (Test pipeline expected terminated in state: RUNNING and Expected 500 messages.)
> 22:50:10 E              but: Expected 500 messages. Got 528 messages. Diffs (item, count):
> 22:50:10 E           Expected but not in actual: dict_items([(b'18: 1', 1), (b'23: 1', 1), (b'152: 1', 1), (b'162: 1', 1), (b'168: 1', 1), (b'184: 1', 1), (b'206: 1', 1), (b'208: 1', 1), (b'215: 1', 1), (b'247: 1', 1), (b'255: 1', 1), (b'265: 1', 1), (b'276: 1', 1), (b'278: 1', 1), (b'294: 1', 1), (b'350: 1', 1), (b'356: 1', 1), (b'395: 1', 1), (b'428: 1', 1), (b'450: 1', 1), (b'474: 1', 1)])
> 22:50:10 E           Unexpected: dict_items([(b'384: 1', 1), (b'237: 1', 1), (b'166: 1', 1), (b'262: 1', 1), (b'5: 1', 1), (b'13: 1', 1), (b'437: 1', 1), (b'263: 1', 1), (b'423: 1', 1), (b'317: 1', 1), (b'447: 1', 1), (b'125: 1', 1), (b'270: 1', 1), (b'116: 1', 1), (b'102: 1', 1), (b'326: 1', 1), (b'21: 1', 1), (b'244: 1', 1), (b'400: 1', 1), (b'117: 1', 1), (b'393: 1', 1), (b'225: 1', 1), (b'187: 1', 1), (b'210: 1', 1), (b'258: 1', 1), (b'226: 1', 1), (b'127: 1', 1), (b'84: 1', 1), (b'182: 1', 1), (b'373: 1', 1), (b'104: 1', 1), (b'382: 1', 1), (b'295: 1', 1), (b'325: 1', 1), (b'113: 1', 1), (b'470: 1', 1), (b'14: 1', 1), (b'353: 1', 1), (b'333: 1', 1), (b'413: 1', 1), (b'445: 1', 1), (b'115: 1', 1), (b'109: 1', 1), (b'386: 1', 1), (b'274: 1', 1), (b'303: 1', 1), (b'77: 1', 1), (b'455: 1', 1), (b'223: 1', 1)])
> 22:50:10 
> 22:50:10 apache_beam/runners/dataflow/test_dataflow_runner.py:68: AssertionError
> 22:50:10 ------------------------------ Captured log call -------------------------------
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)