You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Valentyn Tymofieiev (Jira)" <ji...@apache.org> on 2021/09/09 02:19:00 UTC
[jira] [Updated] (BEAM-12860)
apache_beam.examples.dataframe.taxiride_it_test.TaxirideIT.test_aggregation
is flaky
[ https://issues.apache.org/jira/browse/BEAM-12860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Valentyn Tymofieiev updated BEAM-12860:
---------------------------------------
Labels: flake (was: )
> apache_beam.examples.dataframe.taxiride_it_test.TaxirideIT.test_aggregation is flaky
> -------------------------------------------------------------------------------------
>
> Key: BEAM-12860
> URL: https://issues.apache.org/jira/browse/BEAM-12860
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Valentyn Tymofieiev
> Assignee: Brian Hulette
> Priority: P2
> Labels: flake
>
> Sample error from:
> https://ci-beam.apache.org/job/beam_PostCommit_Python37/4245/
> {noformat}
> Error Message
> apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Workflow failed.
> Stacktrace
> self = <apache_beam.examples.dataframe.taxiride_it_test.TaxirideIT testMethod=test_aggregation>
> @pytest.mark.it_postcommit
> def test_aggregation(self):
> taxiride.run_aggregation_pipeline(
> self.test_pipeline,
> 'gs://apache-beam-samples/nyc_taxi/2018/*.csv',
> > self.output_path)
> apache_beam/examples/dataframe/taxiride_it_test.py:52:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> apache_beam/examples/dataframe/taxiride.py:42: in run_aggregation_pipeline
> agg.to_csv(output_path)
> apache_beam/pipeline.py:586: in __exit__
> self.result = self.run()
> apache_beam/testing/test_pipeline.py:114: in run
> False if self.not_use_test_runner_api else test_runner_api))
> apache_beam/pipeline.py:541: in run
> self._options).run(False)
> apache_beam/pipeline.py:565: in run
> return self.runner.run_pipeline(self, self._options)
> apache_beam/runners/dataflow/test_dataflow_runner.py:65: in run_pipeline
> self.result.wait_until_finish(duration=wait_duration)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <DataflowPipelineResult <Job
> clientRequestId: '20210908003059981848-7794'
> createTime: '2021-09-08T00:31:08.694724Z'
> ...021-09-08T00:31:08.694724Z'
> steps: []
> tempFiles: []
> type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7f3f4bcf06d8>
> duration = None
> def wait_until_finish(self, duration=None):
> if not self.is_in_terminal_state():
> if not self.has_job:
> raise IOError('Failed to get the Dataflow job id.')
>
> thread = threading.Thread(
> target=DataflowRunner.poll_for_job_completion,
> args=(self._runner, self, duration))
>
> # Mark the thread as a daemon thread so a keyboard interrupt on the main
> # thread will terminate everything. This is also the reason we will not
> # use thread.join() to wait for the polling thread.
> thread.daemon = True
> thread.start()
> while thread.is_alive():
> time.sleep(5.0)
>
> # TODO: Merge the termination code in poll_for_job_completion and
> # is_in_terminal_state.
> terminated = self.is_in_terminal_state()
> assert duration or terminated, (
> 'Job did not reach to a terminal state after waiting indefinitely.')
>
> if terminated and self.state != PipelineState.DONE:
> # TODO(BEAM-1290): Consider converting this to an error log based on
> # theresolution of the issue.
> raise DataflowRuntimeException(
> 'Dataflow pipeline failed. State: %s, Error:\n%s' %
> (self.state, getattr(self._runner, 'last_error_msg', None)),
> > self)
> E apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
> E Workflow failed.
> apache_beam/runners/dataflow/dataflow_runner.py:1635: DataflowRuntimeException
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)