You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kyle Weaver (Jira)" <ji...@apache.org> on 2020/08/20 18:19:00 UTC
[jira] [Updated] (BEAM-9118)
apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
is flaky
[ https://issues.apache.org/jira/browse/BEAM-9118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kyle Weaver updated BEAM-9118:
------------------------------
Status: Resolved (was: Open)
> apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses is flaky
> ------------------------------------------------------------------------------------------------
>
> Key: BEAM-9118
> URL: https://issues.apache.org/jira/browse/BEAM-9118
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Valentyn Tymofieiev
> Priority: P1
> Labels: beam-fixit, flake
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Sample errors:
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1373
> {noformat}
> 4:30:12 self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_unfusable_side_inputs>
> 14:30:12
> 14:30:12 def test_pardo_unfusable_side_inputs(self):
> 14:30:12 def cross_product(elem, sides):
> 14:30:12 for side in sides:
> 14:30:12 yield elem, side
> 14:30:12 with self.create_pipeline() as p:
> 14:30:12 pcoll = p | beam.Create(['a', 'b'])
> 14:30:12 assert_that(
> 14:30:12 pcoll | beam.FlatMap(cross_product, beam.pvalue.AsList(pcoll)),
> 14:30:12 equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))
> 14:30:12
> 14:30:12 with self.create_pipeline() as p:
> 14:30:12 pcoll = p | beam.Create(['a', 'b'])
> 14:30:12 derived = ((pcoll,) | beam.Flatten()
> 14:30:12 | beam.Map(lambda x: (x, x))
> 14:30:12 | beam.GroupByKey()
> 14:30:12 | 'Unkey' >> beam.Map(lambda kv: kv[0]))
> 14:30:12 assert_that(
> 14:30:12 pcoll | beam.FlatMap(cross_product, beam.pvalue.AsList(derived)),
> 14:30:12 > equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))
> 14:30:12
> 14:30:12 apache_beam/runners/portability/fn_api_runner_test.py:258:
> 14:30:12 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> 14:30:12 apache_beam/pipeline.py:481: in __exit__
> 14:30:12 self.run().wait_until_finish()
> 14:30:12 apache_beam/runners/portability/portable_runner.py:445: in wait_until_finish
> 14:30:12 for state_response in self._state_stream:
> 14:30:12 target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:416: in __next__
> 14:30:12 return self._next()
> 14:30:12 target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:694: in _next
> 14:30:12 _common.wait(self._state.condition.wait, _response_ready)
> 14:30:12 target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:140: in wait
> 14:30:12 _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
> 14:30:12 target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:105: in _wait_once
> 14:30:12 wait_fn(timeout=timeout)
> 14:30:12 /usr/lib/python3.6/threading.py:299: in wait
> 14:30:12 gotit = waiter.acquire(True, timeout)
> 14:30:12 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> 14:30:12
> 14:30:12 signum = 14, frame = <frame object at 0x31daf68>
> 14:30:12
> 14:30:12 def handler(signum, frame):
> 14:30:12 msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
> 14:30:12 print('=' * 20, msg, '=' * 20)
> 14:30:12 traceback.print_stack(frame)
> 14:30:12 threads_by_id = {th.ident: th for th in threading.enumerate()}
> 14:30:12 for thread_id, stack in sys._current_frames().items():
> 14:30:12 th = threads_by_id.get(thread_id)
> 14:30:12 print()
> 14:30:12 print('# Thread:', th or thread_id)
> 14:30:12 traceback.print_stack(stack)
> 14:30:12 > raise BaseException(msg)
> 14:30:12 E BaseException: Timed out after 60 seconds.
> 14:30:12
> 14:30:12 apache_beam/runners/portability/portable_runner_test.py:77: BaseException
> {noformat}
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1366/
> {noformat}
> 09:06:01 self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocessesAndMultiWorkers testMethod=test_assert_that>
> 09:06:01
> 09:06:01 def test_assert_that(self):
> 09:06:01 # TODO: figure out a way for fn_api_runner to parse and raise the
> 09:06:01 # underlying exception.
> 09:06:01 with self.assertRaisesRegex(Exception, 'Failed assert'):
> 09:06:01 with self.create_pipeline() as p:
> 09:06:01 > assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
> 09:06:01 E AssertionError: "Failed assert" does not match "Pipeline timed out waiting for job service subprocess."
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)