You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/12/05 19:24:39 UTC

incubator-airflow git commit: [AIRFLOW-1869] Write more error messages into gcs and file logs

Repository: incubator-airflow
Updated Branches:
  refs/heads/master a9ceca5e0 -> 06b41fbe1


[AIRFLOW-1869] Write more error messages into gcs and file logs

Closes #2826 from wrp/gcs-log


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/06b41fbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/06b41fbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/06b41fbe

Branch: refs/heads/master
Commit: 06b41fbe1bf94ca2013fe164ee275f9fbac92973
Parents: a9ceca5
Author: William Pursell <wi...@wepay.com>
Authored: Tue Dec 5 11:24:35 2017 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Tue Dec 5 11:24:35 2017 -0800

----------------------------------------------------------------------
 airflow/utils/log/file_task_handler.py | 10 +++---
 airflow/utils/log/gcs_task_handler.py  | 54 +++++++++--------------------
 tests/www/test_views.py                |  2 +-
 3 files changed, 24 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b41fbe/airflow/utils/log/file_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 6038fbf..f131c09 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -97,9 +97,11 @@ class FileTaskHandler(logging.Handler):
         if os.path.exists(location):
             try:
                 with open(location) as f:
-                    log += "*** Reading local log.\n" + "".join(f.readlines())
+                    log += "*** Reading local file: {}\n".format(location)
+                    log += "".join(f.readlines())
             except Exception as e:
-                log = "*** Failed to load local log file: {}. {}\n".format(location, str(e))
+                log = "*** Failed to load local log file: {}\n".format(location)
+                log += "*** {}\n".format(str(e))
         else:
             url = os.path.join(
                 "http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path
@@ -107,8 +109,8 @@ class FileTaskHandler(logging.Handler):
                 ti=ti,
                 worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT')
             )
-            log += "*** Log file isn't local.\n"
-            log += "*** Fetching here: {url}\n".format(**locals())
+            log += "*** Log file does not exist: {}\n".format(location)
+            log += "*** Fetching from: {}\n".format(url)
             try:
                 timeout = None  # No timeout
                 try:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b41fbe/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index c11e7ad..b556cf0 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -40,11 +40,11 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
             return GoogleCloudStorageHook(
                 google_cloud_storage_conn_id=remote_conn_id
             )
-        except:
+        except Exception as e:
             self.log.error(
                 'Could not create a GoogleCloudStorageHook with connection id '
-                '"%s". Please make sure that airflow[gcp_api] is installed '
-                'and the GCS connection exists.', remote_conn_id
+                '"{}". {}\n\nPlease make sure that airflow[gcp_api] is installed '
+                'and the GCS connection exists.'.format(remote_conn_id, str(e))
             )
 
     @property
@@ -97,49 +97,26 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         log_relative_path = self._render_filename(ti, try_number + 1)
         remote_loc = os.path.join(self.remote_base, log_relative_path)
 
-        if self.gcs_log_exists(remote_loc):
-            # If GCS remote file exists, we do not fetch logs from task instance
-            # local machine even if there are errors reading remote logs, as
-            # remote_log will contain error message.
-            remote_log = self.gcs_read(remote_loc, return_error=True)
+        try:
+            remote_log = self.gcs_read(remote_loc)
             log = '*** Reading remote log from {}.\n{}\n'.format(
                 remote_loc, remote_log)
-        else:
-            log = super(GCSTaskHandler, self)._read(ti, try_number)
+        except Exception as e:
+            log = '*** Unable to read remote log from {}\n*** {}\n\n'.format(
+                remote_loc, str(e))
+            self.log.error(log)
+            log += super(GCSTaskHandler, self)._read(ti, try_number)
 
         return log
 
-    def gcs_log_exists(self, remote_log_location):
-        """
-        Check if remote_log_location exists in remote storage
-        :param remote_log_location: log's location in remote storage
-        :return: True if location exists else False
-        """
-        try:
-            bkt, blob = self.parse_gcs_url(remote_log_location)
-            return self.hook.exists(bkt, blob)
-        except Exception:
-            pass
-        return False
-
-    def gcs_read(self, remote_log_location, return_error=False):
+    def gcs_read(self, remote_log_location):
         """
         Returns the log found at the remote_log_location.
         :param remote_log_location: the log's location in remote storage
         :type remote_log_location: string (path)
-        :param return_error: if True, returns a string error message if an
-            error occurs. Otherwise returns '' when an error occurs.
-        :type return_error: bool
         """
-        try:
-            bkt, blob = self.parse_gcs_url(remote_log_location)
-            return self.hook.download(bkt, blob).decode()
-        except:
-            # return error if needed
-            if return_error:
-                msg = 'Could not read logs from {}'.format(remote_log_location)
-                self.log.error(msg)
-                return msg
+        bkt, blob = self.parse_gcs_url(remote_log_location)
+        return self.hook.download(bkt, blob).decode()
 
     def gcs_write(self, log, remote_log_location, append=True):
         """
@@ -154,7 +131,10 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         :type append: bool
         """
         if append:
-            old_log = self.gcs_read(remote_log_location)
+            try:
+                old_log = self.gcs_read(remote_log_location)
+            except Exception as e:
+                old_log = '*** Previous log discarded: {}\n\n'.format(str(e))
             log = '\n'.join([old_log, log]) if old_log else log
 
         try:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b41fbe/tests/www/test_views.py
----------------------------------------------------------------------
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index f5b015e..0051848 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -374,7 +374,7 @@ class TestLogView(unittest.TestCase):
             follow_redirects=True,
         )
         self.assertEqual(response.status_code, 200)
-        self.assertIn('Log file isn',
+        self.assertIn('Log file does not exist',
                       response.data.decode('utf-8'))