You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/18 04:22:43 UTC

[GitHub] stale[bot] closed pull request #3447: [AIRFLOW-2549] Fix DataProcOperation error-check

stale[bot] closed pull request #3447: [AIRFLOW-2549] Fix DataProcOperation error-check
URL: https://github.com/apache/incubator-airflow/pull/3447
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index ce65b2b915..afa82ce5f9 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -175,21 +175,45 @@ def get(self):
         return self.operation
 
     def _check_done(self):
-        if 'done' in self.operation:
+        def _check_error():
+            """ Check the operation for errors.  Precondition is that the
+                operation must be marked as done already.
+            """
             if 'error' in self.operation:
-                self.log.warning(
-                    'Dataproc Operation %s failed with error: %s',
-                    self.operation_name, self.operation['error']['message'])
-                self._raise_error()
-            else:
-                self.log.info(
-                    'Dataproc Operation %s done', self.operation['name'])
-                return True
-        return False
+                return (True, self.operation['error']['message'])
+
+            # Dataproc workflow templates do not set the 'error' field when
+            # jobs fail; we have to examine the individual jobs for failures.
+            metadata = self.operation.get('metadata', {})
+            if not metadata.get('@type', '').endswith('WorkflowMetadata'):
+                return (False, None)
+
+            nodes = metadata.get('graph', {}).get('nodes', [])
+
+            error_nodes = [node for node in nodes if node.get('error')]
+
+            return (False, None) if not error_nodes else \
+                (True, str({
+                    node['jobId']: node['error'] for node in error_nodes}))
+
+        if not self.operation.get('done'):
+            # either the done field is not present, or it is false
+            return False
+
+        (operation_failed, error_message) = _check_error()
+        if operation_failed:
+            self.log.warning(
+                'Dataproc Operation %s failed with error: %s',
+                self.operation_name, error_message)
+            self._raise_error(error_message)
+        else:
+            self.log.info(
+                'Dataproc Operation %s done', self.operation_name)
+            return True
 
-    def _raise_error(self):
+    def _raise_error(self, error_message):
         raise Exception('Google Dataproc Operation %s failed: %s' %
-                        (self.operation_name, self.operation['error']['message']))
+                        (self.operation_name, error_message))
 
 
 class DataProcHook(GoogleCloudBaseHook):
diff --git a/tests/contrib/hooks/test_gcp_dataproc_hook.py b/tests/contrib/hooks/test_gcp_dataproc_hook.py
index f2629ff148..26f059d35c 100644
--- a/tests/contrib/hooks/test_gcp_dataproc_hook.py
+++ b/tests/contrib/hooks/test_gcp_dataproc_hook.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,7 @@
 #
 
 import unittest
-from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
+from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook, _DataProcOperation
 
 try:
     from unittest import mock
@@ -48,6 +48,98 @@ def setUp(self):
 
     @mock.patch(DATAPROC_STRING.format('_DataProcJob'))
     def test_submit(self, job_mock):
-      with mock.patch(DATAPROC_STRING.format('DataProcHook.get_conn', return_value=None)):
-        self.dataproc_hook.submit(PROJECT_ID, JOB)
-        job_mock.assert_called_once_with(mock.ANY, PROJECT_ID, JOB, REGION)
+        with mock.patch(DATAPROC_STRING.format('DataProcHook.get_conn',
+                                               return_value=None)):
+            self.dataproc_hook.submit(PROJECT_ID, JOB)
+            job_mock.assert_called_once_with(mock.ANY, PROJECT_ID, JOB, REGION)
+
+    def test_successful_operation_detector(self):
+        operation_api_response = \
+            {
+                "done": True,
+                "metadata": {
+                    "@type": "type.googleapis.com/google.cloud.dataproc.v1beta2."
+                             "WorkflowMetadata",
+                    "clusterName": "fake-dataproc-cluster",
+                    "createCluster": {
+                        "done": True,
+                        "operationId": "projects/my-project/regions/us-central1/"
+                                       "operations/1111111-0000-aaaa-bbbb-ffffffffffff"
+                    },
+                    "deleteCluster": {
+                        "done": True,
+                        "operationId": "projects/my-project/regions/us-central1/"
+                                       "operations/1111111-1111-aaaa-bbbb-ffffffffffff"
+                    },
+                    "graph": {
+                        "nodes": [
+                            {
+                                "jobId": "my-job-abcdefghijklm",
+                                "state": "COMPLETED",
+                                "stepId": "my-job"
+                            }
+                        ]
+                    },
+                    "state": "DONE"
+                },
+                "name": "projects/my-project/regions/us-central1/operations/"
+                        "dddddddd-dddd-dddd-dddd-dddddddddddd",
+                "response": {
+                    "@type": "type.googleapis.com/google.protobuf.Empty"
+                }
+            }
+
+        operation = _DataProcOperation(None, operation_api_response)
+
+        self.assertTrue(operation._check_done())
+
+    def test_failed_operation_detector(self):
+        failure_response = \
+            {
+                "done": True,
+                "metadata": {
+                    "@type": "type.googleapis.com/google.cloud.dataproc."
+                             "v1beta2.WorkflowMetadata",
+                    "clusterName": "fake-dataproc-cluster",
+                    "createCluster": {
+                        "done": True,
+                        "operationId": "projects/my-project/regions/us-central1/"
+                                       "operations/1111111-0000-aaaa-bbbb-ffffffffffff"
+                    },
+                    "deleteCluster": {
+                        "done": True,
+                        "operationId": "projects/my-project/regions/us-central1/"
+                                       "operations/1111111-1111-aaaa-bbbb-ffffffffffff"
+                    },
+                    "graph": {
+                        "nodes": [
+                            {
+                                "error": "Google Cloud Dataproc Agent reports job"
+                                         "failure. If logs are available, they can"
+                                         " be found in 'gs://dataproc-00000000-0000-"
+                                         "0000-0000-000000000000-us-central1/"
+                                         "google-cloud-dataproc-metainfo/cccccccc-cccc-"
+                                         "cccc-cccc-cccccccccccc/jobs/"
+                                         "my-job-abcdefghijklm/driveroutput'.",
+                                "jobId": "my-job-abcdefghijklm",
+                                "state": "FAILED",
+                                "stepId": "my-job"
+                            }
+                        ]
+                    },
+                    "state": "DONE"
+                },
+                "name": "projects/my-project/regions/us-central1/operations/"
+                        "dddddddd-dddd-dddd-dddd-dddddddddddd",
+                "response": {
+                    "@type": "type.googleapis.com/google.protobuf.Empty"
+                }
+            }
+
+        operation = _DataProcOperation(None, failure_response)
+
+        with self.assertRaises(Exception) as context:
+            operation._check_done()
+
+        self.assertTrue(str(context.exception).startswith(
+            'Google Dataproc Operation'))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services