You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/18 04:23:01 UTC

[jira] [Commented] (AIRFLOW-2549) GCP DataProc Workflow Template operators report success when jobs fail

    [ https://issues.apache.org/jira/browse/AIRFLOW-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723673#comment-16723673 ] 

ASF GitHub Bot commented on AIRFLOW-2549:
-----------------------------------------

stale[bot] closed pull request #3447: [AIRFLOW-2549] Fix DataProcOperation error-check
URL: https://github.com/apache/incubator-airflow/pull/3447
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index ce65b2b915..afa82ce5f9 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -175,21 +175,45 @@ def get(self):
         return self.operation
 
     def _check_done(self):
-        if 'done' in self.operation:
+        def _check_error():
+            """ Check the operation for errors.  Precondition is that the
+                operation must be marked as done already.
+            """
             if 'error' in self.operation:
-                self.log.warning(
-                    'Dataproc Operation %s failed with error: %s',
-                    self.operation_name, self.operation['error']['message'])
-                self._raise_error()
-            else:
-                self.log.info(
-                    'Dataproc Operation %s done', self.operation['name'])
-                return True
-        return False
+                return (True, self.operation['error']['message'])
+
+            # Dataproc workflow templates do not set the 'error' field when
+            # jobs fail; we have to examine the individual jobs for failures.
+            metadata = self.operation.get('metadata', {})
+            if not metadata.get('@type', '').endswith('WorkflowMetadata'):
+                return (False, None)
+
+            nodes = metadata.get('graph', {}).get('nodes', [])
+
+            error_nodes = [node for node in nodes if node.get('error')]
+
+            return (False, None) if not error_nodes else \
+                (True, str({
+                    node['jobId']: node['error'] for node in error_nodes}))
+
+        if not self.operation.get('done'):
+            # either the done field is not present, or it is false
+            return False
+
+        (operation_failed, error_message) = _check_error()
+        if operation_failed:
+            self.log.warning(
+                'Dataproc Operation %s failed with error: %s',
+                self.operation_name, error_message)
+            self._raise_error(error_message)
+        else:
+            self.log.info(
+                'Dataproc Operation %s done', self.operation_name)
+            return True
 
-    def _raise_error(self):
+    def _raise_error(self, error_message):
         raise Exception('Google Dataproc Operation %s failed: %s' %
-                        (self.operation_name, self.operation['error']['message']))
+                        (self.operation_name, error_message))
 
 
 class DataProcHook(GoogleCloudBaseHook):
diff --git a/tests/contrib/hooks/test_gcp_dataproc_hook.py b/tests/contrib/hooks/test_gcp_dataproc_hook.py
index f2629ff148..26f059d35c 100644
--- a/tests/contrib/hooks/test_gcp_dataproc_hook.py
+++ b/tests/contrib/hooks/test_gcp_dataproc_hook.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,7 @@
 #
 
 import unittest
-from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
+from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook, _DataProcOperation
 
 try:
     from unittest import mock
@@ -48,6 +48,98 @@ def setUp(self):
 
     @mock.patch(DATAPROC_STRING.format('_DataProcJob'))
     def test_submit(self, job_mock):
-      with mock.patch(DATAPROC_STRING.format('DataProcHook.get_conn', return_value=None)):
-        self.dataproc_hook.submit(PROJECT_ID, JOB)
-        job_mock.assert_called_once_with(mock.ANY, PROJECT_ID, JOB, REGION)
+        with mock.patch(DATAPROC_STRING.format('DataProcHook.get_conn',
+                                               return_value=None)):
+            self.dataproc_hook.submit(PROJECT_ID, JOB)
+            job_mock.assert_called_once_with(mock.ANY, PROJECT_ID, JOB, REGION)
+
+    def test_successful_operation_detector(self):
+        operation_api_response = \
+            {
+                "done": True,
+                "metadata": {
+                    "@type": "type.googleapis.com/google.cloud.dataproc.v1beta2."
+                             "WorkflowMetadata",
+                    "clusterName": "fake-dataproc-cluster",
+                    "createCluster": {
+                        "done": True,
+                        "operationId": "projects/my-project/regions/us-central1/"
+                                       "operations/1111111-0000-aaaa-bbbb-ffffffffffff"
+                    },
+                    "deleteCluster": {
+                        "done": True,
+                        "operationId": "projects/my-project/regions/us-central1/"
+                                       "operations/1111111-1111-aaaa-bbbb-ffffffffffff"
+                    },
+                    "graph": {
+                        "nodes": [
+                            {
+                                "jobId": "my-job-abcdefghijklm",
+                                "state": "COMPLETED",
+                                "stepId": "my-job"
+                            }
+                        ]
+                    },
+                    "state": "DONE"
+                },
+                "name": "projects/my-project/regions/us-central1/operations/"
+                        "dddddddd-dddd-dddd-dddd-dddddddddddd",
+                "response": {
+                    "@type": "type.googleapis.com/google.protobuf.Empty"
+                }
+            }
+
+        operation = _DataProcOperation(None, operation_api_response)
+
+        self.assertTrue(operation._check_done())
+
+    def test_failed_operation_detector(self):
+        failure_response = \
+            {
+                "done": True,
+                "metadata": {
+                    "@type": "type.googleapis.com/google.cloud.dataproc."
+                             "v1beta2.WorkflowMetadata",
+                    "clusterName": "fake-dataproc-cluster",
+                    "createCluster": {
+                        "done": True,
+                        "operationId": "projects/my-project/regions/us-central1/"
+                                       "operations/1111111-0000-aaaa-bbbb-ffffffffffff"
+                    },
+                    "deleteCluster": {
+                        "done": True,
+                        "operationId": "projects/my-project/regions/us-central1/"
+                                       "operations/1111111-1111-aaaa-bbbb-ffffffffffff"
+                    },
+                    "graph": {
+                        "nodes": [
+                            {
+                                "error": "Google Cloud Dataproc Agent reports job"
+                                         "failure. If logs are available, they can"
+                                         " be found in 'gs://dataproc-00000000-0000-"
+                                         "0000-0000-000000000000-us-central1/"
+                                         "google-cloud-dataproc-metainfo/cccccccc-cccc-"
+                                         "cccc-cccc-cccccccccccc/jobs/"
+                                         "my-job-abcdefghijklm/driveroutput'.",
+                                "jobId": "my-job-abcdefghijklm",
+                                "state": "FAILED",
+                                "stepId": "my-job"
+                            }
+                        ]
+                    },
+                    "state": "DONE"
+                },
+                "name": "projects/my-project/regions/us-central1/operations/"
+                        "dddddddd-dddd-dddd-dddd-dddddddddddd",
+                "response": {
+                    "@type": "type.googleapis.com/google.protobuf.Empty"
+                }
+            }
+
+        operation = _DataProcOperation(None, failure_response)
+
+        with self.assertRaises(Exception) as context:
+            operation._check_done()
+
+        self.assertTrue(str(context.exception).startswith(
+            'Google Dataproc Operation'))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> GCP DataProc Workflow Template operators report success when jobs fail
> ----------------------------------------------------------------------
>
>                 Key: AIRFLOW-2549
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2549
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Kevin McHale
>            Assignee: Kevin McHale
>            Priority: Major
>
> cc: [~DanSedov] [~fenglu]
>  
> The Google DataProc workflow template operators use the[_DataProcOperator|https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/gcp_dataproc_hook.py#L149] class for analyzing the outcome of the workflow template instance, but that class does not properly detect errors.
>  
> Specifically, when any one of the jobs in the template fails, the operator should report an error, but it always reports success because it does not properly analyze the API responses.
>  
> The outcomes of individual jobs are indicated in the API responses under the {{metadata.graph.nodes}} path in the API response, and this field needs to be checked for errors.  However, the existing {{_DataProcOperator}} class only checks for the existence of the {{done}} and {{error}} fields.
>  
> Below is an example of the API response object for a failed DataProc workflow template operation, to illustrate this.  I pulled this directly from the DataProc API and anonymized it:
> {code:java}
> {
>   "response": {
>     "@type": "type.googleapis.com/google.protobuf.Empty"
>   },
>   "done": true,
>   "name": "projects/my-project/regions/us-central1/operations/dddddddd-dddd-dddd-dddd-dddddddddddd",
>   "metadata": {
>     "createCluster": {
>       "done": true,
>       "operationId": "projects/my-project/regions/us-central1/operations/1111111-0000-aaaa-bbbb-ffffffffffff"
>     },
>     "clusterName": "fake-dataproc-cluster",
>     "graph": {
>       "nodes": [
>         {
>           "state": "FAILED",
>           "jobId": "my-job-abcdefghijklm",
>           "stepId": "my-job",
>           "error": "Google Cloud Dataproc Agent reports job failure. If logs are available, they can be found in 'gs://dataproc-00000000-0000-0000-0000-000000000000-us-central1/google-cloud-dataproc-metainfo/cccccccc-cccc-cccc-cccc-cccccccccccc/jobs/my-job-abcdefghijklm/driveroutput'."
>         }
>       ]
>     },
>     "state": "DONE",
>     "deleteCluster": {
>       "done": true,
>       "operationId": "projects/my-project/regions/us-central1/operations/1111111-1111-aaaa-bbbb-ffffffffffff"
>     },
>     "@type": "type.googleapis.com/google.cloud.dataproc.v1beta2.WorkflowMetadata"
>   }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)