You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Valentyn Tymofieiev (Jira)" <ji...@apache.org> on 2021/09/09 01:42:00 UTC
[jira] [Created] (BEAM-12860)
apache_beam.examples.dataframe.taxiride_it_test.TaxirideIT.test_aggregation
is flaky
Valentyn Tymofieiev created BEAM-12860:
------------------------------------------
Summary: apache_beam.examples.dataframe.taxiride_it_test.TaxirideIT.test_aggregation is flaky
Key: BEAM-12860
URL: https://issues.apache.org/jira/browse/BEAM-12860
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Reporter: Valentyn Tymofieiev
Sample error from:
https://ci-beam.apache.org/job/beam_PostCommit_Python37/4245/
{noformat}
Error Message
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Workflow failed.
Stacktrace
self = <apache_beam.examples.dataframe.taxiride_it_test.TaxirideIT testMethod=test_aggregation>
@pytest.mark.it_postcommit
def test_aggregation(self):
taxiride.run_aggregation_pipeline(
self.test_pipeline,
'gs://apache-beam-samples/nyc_taxi/2018/*.csv',
> self.output_path)
apache_beam/examples/dataframe/taxiride_it_test.py:52:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/examples/dataframe/taxiride.py:42: in run_aggregation_pipeline
agg.to_csv(output_path)
apache_beam/pipeline.py:586: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:114: in run
False if self.not_use_test_runner_api else test_runner_api))
apache_beam/pipeline.py:541: in run
self._options).run(False)
apache_beam/pipeline.py:565: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/dataflow/test_dataflow_runner.py:65: in run_pipeline
self.result.wait_until_finish(duration=wait_duration)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <DataflowPipelineResult <Job
clientRequestId: '20210908003059981848-7794'
createTime: '2021-09-08T00:31:08.694724Z'
...021-09-08T00:31:08.694724Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7f3f4bcf06d8>
duration = None
def wait_until_finish(self, duration=None):
if not self.is_in_terminal_state():
if not self.has_job:
raise IOError('Failed to get the Dataflow job id.')
thread = threading.Thread(
target=DataflowRunner.poll_for_job_completion,
args=(self._runner, self, duration))
# Mark the thread as a daemon thread so a keyboard interrupt on the main
# thread will terminate everything. This is also the reason we will not
# use thread.join() to wait for the polling thread.
thread.daemon = True
thread.start()
while thread.is_alive():
time.sleep(5.0)
# TODO: Merge the termination code in poll_for_job_completion and
# is_in_terminal_state.
terminated = self.is_in_terminal_state()
assert duration or terminated, (
'Job did not reach to a terminal state after waiting indefinitely.')
if terminated and self.state != PipelineState.DONE:
# TODO(BEAM-1290): Consider converting this to an error log based on
# theresolution of the issue.
raise DataflowRuntimeException(
'Dataflow pipeline failed. State: %s, Error:\n%s' %
(self.state, getattr(self._runner, 'last_error_msg', None)),
> self)
E apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
E Workflow failed.
apache_beam/runners/dataflow/dataflow_runner.py:1635: DataflowRuntimeException
{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)