You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/05/15 18:02:02 UTC

[jira] [Updated] (BEAM-9118) apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses is flaky

     [ https://issues.apache.org/jira/browse/BEAM-9118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-9118:
----------------------------------
    Resolution: Fixed
        Status: Resolved  (was: Resolved)

Hello! Due to a bug in our Jira configuration, this issue had status:Resolved but resolution:Unresolved.

I am bulk editing these issues to have resolution:Fixed

If a different resolution is appropriate, please change it. To do this, click the "Resolve" button (you can do this even for closed issues) and set the Resolution field to the right value.

> apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses is flaky
> ------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-9118
>                 URL: https://issues.apache.org/jira/browse/BEAM-9118
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Valentyn Tymofieiev
>            Priority: P1
>              Labels: beam-fixit, flake
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Sample errors:
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1373
> {noformat}
> 4:30:12  self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_unfusable_side_inputs>
> 14:30:12  
> 14:30:12      def test_pardo_unfusable_side_inputs(self):
> 14:30:12        def cross_product(elem, sides):
> 14:30:12          for side in sides:
> 14:30:12            yield elem, side
> 14:30:12        with self.create_pipeline() as p:
> 14:30:12          pcoll = p | beam.Create(['a', 'b'])
> 14:30:12          assert_that(
> 14:30:12              pcoll | beam.FlatMap(cross_product, beam.pvalue.AsList(pcoll)),
> 14:30:12              equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))
> 14:30:12      
> 14:30:12        with self.create_pipeline() as p:
> 14:30:12          pcoll = p | beam.Create(['a', 'b'])
> 14:30:12          derived = ((pcoll,) | beam.Flatten()
> 14:30:12                     | beam.Map(lambda x: (x, x))
> 14:30:12                     | beam.GroupByKey()
> 14:30:12                     | 'Unkey' >> beam.Map(lambda kv: kv[0]))
> 14:30:12          assert_that(
> 14:30:12              pcoll | beam.FlatMap(cross_product, beam.pvalue.AsList(derived)),
> 14:30:12  >           equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/fn_api_runner_test.py:258: 
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> 14:30:12  apache_beam/pipeline.py:481: in __exit__
> 14:30:12      self.run().wait_until_finish()
> 14:30:12  apache_beam/runners/portability/portable_runner.py:445: in wait_until_finish
> 14:30:12      for state_response in self._state_stream:
> 14:30:12  target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:416: in __next__
> 14:30:12      return self._next()
> 14:30:12  target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:694: in _next
> 14:30:12      _common.wait(self._state.condition.wait, _response_ready)
> 14:30:12  target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:140: in wait
> 14:30:12      _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
> 14:30:12  target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:105: in _wait_once
> 14:30:12      wait_fn(timeout=timeout)
> 14:30:12  /usr/lib/python3.6/threading.py:299: in wait
> 14:30:12      gotit = waiter.acquire(True, timeout)
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> 14:30:12  
> 14:30:12  signum = 14, frame = <frame object at 0x31daf68>
> 14:30:12  
> 14:30:12      def handler(signum, frame):
> 14:30:12        msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
> 14:30:12        print('=' * 20, msg, '=' * 20)
> 14:30:12        traceback.print_stack(frame)
> 14:30:12        threads_by_id = {th.ident: th for th in threading.enumerate()}
> 14:30:12        for thread_id, stack in sys._current_frames().items():
> 14:30:12          th = threads_by_id.get(thread_id)
> 14:30:12          print()
> 14:30:12          print('# Thread:', th or thread_id)
> 14:30:12          traceback.print_stack(stack)
> 14:30:12  >     raise BaseException(msg)
> 14:30:12  E     BaseException: Timed out after 60 seconds.
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/portable_runner_test.py:77: BaseException
> {noformat}
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1366/
> {noformat}
> 09:06:01  self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocessesAndMultiWorkers testMethod=test_assert_that>
> 09:06:01  
> 09:06:01      def test_assert_that(self):
> 09:06:01        # TODO: figure out a way for fn_api_runner to parse and raise the
> 09:06:01        # underlying exception.
> 09:06:01        with self.assertRaisesRegex(Exception, 'Failed assert'):
> 09:06:01          with self.create_pipeline() as p:
> 09:06:01  >         assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
> 09:06:01  E         AssertionError: "Failed assert" does not match "Pipeline timed out waiting for job service subprocess."
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)