You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Valentyn Tymofieiev (Jira)" <ji...@apache.org> on 2021/09/09 01:42:00 UTC

[jira] [Created] (BEAM-12860) apache_beam.examples.dataframe.taxiride_it_test.TaxirideIT.test_aggregation is flaky

Valentyn Tymofieiev created BEAM-12860:
------------------------------------------

             Summary: apache_beam.examples.dataframe.taxiride_it_test.TaxirideIT.test_aggregation  is flaky
                 Key: BEAM-12860
                 URL: https://issues.apache.org/jira/browse/BEAM-12860
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
            Reporter: Valentyn Tymofieiev


Sample error from:
https://ci-beam.apache.org/job/beam_PostCommit_Python37/4245/

{noformat}
Error Message
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Workflow failed.
Stacktrace
self = <apache_beam.examples.dataframe.taxiride_it_test.TaxirideIT testMethod=test_aggregation>

    @pytest.mark.it_postcommit
    def test_aggregation(self):
      taxiride.run_aggregation_pipeline(
          self.test_pipeline,
          'gs://apache-beam-samples/nyc_taxi/2018/*.csv',
>         self.output_path)

apache_beam/examples/dataframe/taxiride_it_test.py:52: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/examples/dataframe/taxiride.py:42: in run_aggregation_pipeline
    agg.to_csv(output_path)
apache_beam/pipeline.py:586: in __exit__
    self.result = self.run()
apache_beam/testing/test_pipeline.py:114: in run
    False if self.not_use_test_runner_api else test_runner_api))
apache_beam/pipeline.py:541: in run
    self._options).run(False)
apache_beam/pipeline.py:565: in run
    return self.runner.run_pipeline(self, self._options)
apache_beam/runners/dataflow/test_dataflow_runner.py:65: in run_pipeline
    self.result.wait_until_finish(duration=wait_duration)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <DataflowPipelineResult <Job
 clientRequestId: '20210908003059981848-7794'
 createTime: '2021-09-08T00:31:08.694724Z'
...021-09-08T00:31:08.694724Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7f3f4bcf06d8>
duration = None

    def wait_until_finish(self, duration=None):
      if not self.is_in_terminal_state():
        if not self.has_job:
          raise IOError('Failed to get the Dataflow job id.')
    
        thread = threading.Thread(
            target=DataflowRunner.poll_for_job_completion,
            args=(self._runner, self, duration))
    
        # Mark the thread as a daemon thread so a keyboard interrupt on the main
        # thread will terminate everything. This is also the reason we will not
        # use thread.join() to wait for the polling thread.
        thread.daemon = True
        thread.start()
        while thread.is_alive():
          time.sleep(5.0)
    
        # TODO: Merge the termination code in poll_for_job_completion and
        # is_in_terminal_state.
        terminated = self.is_in_terminal_state()
        assert duration or terminated, (
            'Job did not reach to a terminal state after waiting indefinitely.')
    
        if terminated and self.state != PipelineState.DONE:
          # TODO(BEAM-1290): Consider converting this to an error log based on
          # theresolution of the issue.
          raise DataflowRuntimeException(
              'Dataflow pipeline failed. State: %s, Error:\n%s' %
              (self.state, getattr(self._runner, 'last_error_msg', None)),
>             self)
E         apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
E         Workflow failed.

apache_beam/runners/dataflow/dataflow_runner.py:1635: DataflowRuntimeException
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)