You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Tomasz Urbaszek (JIRA)" <ji...@apache.org> on 2019/07/18 16:59:00 UTC

[jira] [Commented] (AIRFLOW-2549) GCP DataProc Workflow Template operators report success when jobs fail

    [ https://issues.apache.org/jira/browse/AIRFLOW-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16888158#comment-16888158 ] 

Tomasz Urbaszek commented on AIRFLOW-2549:
------------------------------------------

Could not replicate the problem.

Steps I took:
- create Dataproc workflow `test-workflow` with two jobs: 
 - `hadoop_job` - supposed to fail
 - `pig_job` - supposed to success
- create example DAG with `DataprocWorkflowTemplateInstantiateOperator` to instantiate `test-workflow`
- wait for output:


{code:java}
[2019-07-18 08:38:25,294] {taskinstance.py:614} INFO - Dependencies all met for <TaskInstance: example_gcp_dataproc_template.instantiate_task 2019-01-01T00:00:00+00:00 [None]>
[2019-07-18 08:38:25,300] {taskinstance.py:614} INFO - Dependencies all met for <TaskInstance: example_gcp_dataproc_template.instantiate_task 2019-01-01T00:00:00+00:00 [None]>
[2019-07-18 08:38:25,300] {taskinstance.py:832} INFO -
--------------------------------------------------------------------------------
[2019-07-18 08:38:25,300] {taskinstance.py:833} INFO - Starting attempt 1 of 1
[2019-07-18 08:38:25,300] {taskinstance.py:834} INFO -
--------------------------------------------------------------------------------
[2019-07-18 08:38:25,301] {taskinstance.py:853} INFO - Executing <Task(DataprocWorkflowTemplateInstantiateOperator): instantiate_task> on 2019-01-01T00:00:00+00:00
[2019-07-18 08:38:25,318] {dataproc_operator.py:1098} INFO - Instantiating Template: test-workflow
[2019-07-18 08:38:25,318] {gcp_api_base_hook.py:98} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2019-07-18 08:38:25,325] {discovery.py:272} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/dataproc/v1beta2/rest
[2019-07-18 08:38:25,644] {discovery.py:873} INFO - URL being requested: POST https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/workflowTemplates/test-workflow:instantiate?alt=json
[2019-07-18 08:38:27,613] {gcp_api_base_hook.py:98} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2019-07-18 08:38:27,622] {discovery.py:272} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/dataproc/v1beta2/rest
[2019-07-18 08:38:27,900] {gcp_dataproc_hook.py:363} INFO - Waiting for Dataproc Operation projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c to finish
[2019-07-18 08:38:37,879] {discovery.py:873} INFO - URL being requested: GET https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c?alt=json
[2019-07-18 08:38:48,782] {discovery.py:873} INFO - URL being requested: GET https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c?alt=json
[2019-07-18 08:38:59,586] {discovery.py:873} INFO - URL being requested: GET https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c?alt=json
[2019-07-18 08:39:09,866] {discovery.py:873} INFO - URL being requested: GET https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c?alt=json
[2019-07-18 08:39:20,153] {discovery.py:873} INFO - URL being requested: GET https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c?alt=json
[2019-07-18 08:39:30,731] {discovery.py:873} INFO - URL being requested: GET https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c?alt=json
[2019-07-18 08:39:41,195] {discovery.py:873} INFO - URL being requested: GET https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c?alt=json
[2019-07-18 08:39:52,479] {discovery.py:873} INFO - URL being requested: GET https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c?alt=json
[2019-07-18 08:40:02,884] {discovery.py:873} INFO - URL being requested: GET https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c?alt=json
[2019-07-18 08:40:13,142] {discovery.py:873} INFO - URL being requested: GET https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c?alt=json
[2019-07-18 08:40:23,924] {discovery.py:873} INFO - URL being requested: GET https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c?alt=json
[2019-07-18 08:40:34,642] {discovery.py:873} INFO - URL being requested: GET https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c?alt=json
[2019-07-18 08:40:44,928] {discovery.py:873} INFO - URL being requested: GET https://dataproc.googleapis.com/v1beta2/projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c?alt=json
[2019-07-18 08:40:45,209] {gcp_dataproc_hook.py:389} WARNING - Dataproc Operation projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c failed with error: Step 'hadoop_job' (hadoop_job-a342p4tbavep6) failed
[2019-07-18 08:40:45,210] {taskinstance.py:1045} ERROR - Google Dataproc Operation projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c failed: Step 'hadoop_job' (hadoop_job-a342p4tbavep6) failed
Traceback (most recent call last):
File "/workspace/airflow/models/taskinstance.py", line 920, in _run_raw_task
result = task_copy.execute(context=context)
File "/workspace/airflow/contrib/operators/dataproc_operator.py", line 66, in execute
self.hook.wait(self.start())
File "/workspace/airflow/contrib/hooks/gcp_dataproc_hook.py", line 497, in wait
submitted.wait_for_done()
File "/workspace/airflow/contrib/hooks/gcp_dataproc_hook.py", line 373, in wait_for_done
if self._check_done():
File "/workspace/airflow/contrib/hooks/gcp_dataproc_hook.py", line 390, in _check_done
self._raise_error()
File "/workspace/airflow/contrib/hooks/gcp_dataproc_hook.py", line 399, in _raise_error
(self.operation_name, self.operation['error']['message']))
Exception: Google Dataproc Operation projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c failed: Step 'hadoop_job' (hadoop_job-a342p4tbavep6) failed
[2019-07-18 08:40:45,232] {taskinstance.py:1076} INFO - Marking task as FAILED.
Traceback (most recent call last):
File "/root/.virtualenvs/airflow36/bin/airflow", line 7, in <module>
exec(compile(f.read(), __file__, 'exec'))
File "/workspace/airflow/bin/airflow", line 32, in <module>
args.func(args)
File "/workspace/airflow/utils/cli.py", line 73, in wrapper
return f(*args, **kwargs)
File "/workspace/airflow/bin/cli.py", line 685, in test
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
File "/workspace/airflow/utils/db.py", line 69, in wrapper
return func(*args, **kwargs)
File "/workspace/airflow/models/taskinstance.py", line 1007, in run
session=session)
File "/workspace/airflow/utils/db.py", line 65, in wrapper
return func(*args, **kwargs)
File "/workspace/airflow/models/taskinstance.py", line 920, in _run_raw_task
result = task_copy.execute(context=context)
File "/workspace/airflow/contrib/operators/dataproc_operator.py", line 66, in execute
self.hook.wait(self.start())
File "/workspace/airflow/contrib/hooks/gcp_dataproc_hook.py", line 497, in wait
submitted.wait_for_done()
File "/workspace/airflow/contrib/hooks/gcp_dataproc_hook.py", line 373, in wait_for_done
if self._check_done():
File "/workspace/airflow/contrib/hooks/gcp_dataproc_hook.py", line 390, in _check_done
self._raise_error()
File "/workspace/airflow/contrib/hooks/gcp_dataproc_hook.py", line 399, in _raise_error
(self.operation_name, self.operation['error']['message']))
Exception: Google Dataproc Operation projects/polidea-airflow/regions/global/operations/c0e2c4b0-1408-3976-a139-cb1b32782d1c failed: Step 'hadoop_job' (hadoop_job-a342p4tbavep6) failed
{code}
Last line contains information about the workflow's failure. Moreover, it contains name of the failed job.

 

[~kamil.bregula] what do you think?

> GCP DataProc Workflow Template operators report success when jobs fail
> ----------------------------------------------------------------------
>
>                 Key: AIRFLOW-2549
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2549
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Kevin McHale
>            Assignee: Kevin McHale
>            Priority: Major
>
> cc: [~DanSedov] [~fenglu]
>  
> The Google DataProc workflow template operators use the[_DataProcOperator|https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/gcp_dataproc_hook.py#L149] class for analyzing the outcome of the workflow template instance, but that class does not properly detect errors.
>  
> Specifically, when any one of the jobs in the template fails, the operator should report an error, but it always reports success because it does not properly analyze the API responses.
>  
> The outcomes of individual jobs are indicated in the API responses under the {{metadata.graph.nodes}} path in the API response, and this field needs to be checked for errors.  However, the existing {{_DataProcOperator}} class only checks for the existence of the {{done}} and {{error}} fields.
>  
> Below is an example of the API response object for a failed DataProc workflow template operation, to illustrate this.  I pulled this directly from the DataProc API and anonymized it:
> {code:java}
> {
>   "response": {
>     "@type": "type.googleapis.com/google.protobuf.Empty"
>   },
>   "done": true,
>   "name": "projects/my-project/regions/us-central1/operations/dddddddd-dddd-dddd-dddd-dddddddddddd",
>   "metadata": {
>     "createCluster": {
>       "done": true,
>       "operationId": "projects/my-project/regions/us-central1/operations/1111111-0000-aaaa-bbbb-ffffffffffff"
>     },
>     "clusterName": "fake-dataproc-cluster",
>     "graph": {
>       "nodes": [
>         {
>           "state": "FAILED",
>           "jobId": "my-job-abcdefghijklm",
>           "stepId": "my-job",
>           "error": "Google Cloud Dataproc Agent reports job failure. If logs are available, they can be found in 'gs://dataproc-00000000-0000-0000-0000-000000000000-us-central1/google-cloud-dataproc-metainfo/cccccccc-cccc-cccc-cccc-cccccccccccc/jobs/my-job-abcdefghijklm/driveroutput'."
>         }
>       ]
>     },
>     "state": "DONE",
>     "deleteCluster": {
>       "done": true,
>       "operationId": "projects/my-project/regions/us-central1/operations/1111111-1111-aaaa-bbbb-ffffffffffff"
>     },
>     "@type": "type.googleapis.com/google.cloud.dataproc.v1beta2.WorkflowMetadata"
>   }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)