You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/12/05 19:24:39 UTC
incubator-airflow git commit: [AIRFLOW-1869] Write more error
messages into gcs and file logs
Repository: incubator-airflow
Updated Branches:
refs/heads/master a9ceca5e0 -> 06b41fbe1
[AIRFLOW-1869] Write more error messages into gcs and file logs
Closes #2826 from wrp/gcs-log
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/06b41fbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/06b41fbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/06b41fbe
Branch: refs/heads/master
Commit: 06b41fbe1bf94ca2013fe164ee275f9fbac92973
Parents: a9ceca5
Author: William Pursell <wi...@wepay.com>
Authored: Tue Dec 5 11:24:35 2017 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Tue Dec 5 11:24:35 2017 -0800
----------------------------------------------------------------------
airflow/utils/log/file_task_handler.py | 10 +++---
airflow/utils/log/gcs_task_handler.py | 54 +++++++++--------------------
tests/www/test_views.py | 2 +-
3 files changed, 24 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b41fbe/airflow/utils/log/file_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 6038fbf..f131c09 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -97,9 +97,11 @@ class FileTaskHandler(logging.Handler):
if os.path.exists(location):
try:
with open(location) as f:
- log += "*** Reading local log.\n" + "".join(f.readlines())
+ log += "*** Reading local file: {}\n".format(location)
+ log += "".join(f.readlines())
except Exception as e:
- log = "*** Failed to load local log file: {}. {}\n".format(location, str(e))
+ log = "*** Failed to load local log file: {}\n".format(location)
+ log += "*** {}\n".format(str(e))
else:
url = os.path.join(
"http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path
@@ -107,8 +109,8 @@ class FileTaskHandler(logging.Handler):
ti=ti,
worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT')
)
- log += "*** Log file isn't local.\n"
- log += "*** Fetching here: {url}\n".format(**locals())
+ log += "*** Log file does not exist: {}\n".format(location)
+ log += "*** Fetching from: {}\n".format(url)
try:
timeout = None # No timeout
try:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b41fbe/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index c11e7ad..b556cf0 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -40,11 +40,11 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
return GoogleCloudStorageHook(
google_cloud_storage_conn_id=remote_conn_id
)
- except:
+ except Exception as e:
self.log.error(
'Could not create a GoogleCloudStorageHook with connection id '
- '"%s". Please make sure that airflow[gcp_api] is installed '
- 'and the GCS connection exists.', remote_conn_id
+ '"{}". {}\n\nPlease make sure that airflow[gcp_api] is installed '
+ 'and the GCS connection exists.'.format(remote_conn_id, str(e))
)
@property
@@ -97,49 +97,26 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
log_relative_path = self._render_filename(ti, try_number + 1)
remote_loc = os.path.join(self.remote_base, log_relative_path)
- if self.gcs_log_exists(remote_loc):
- # If GCS remote file exists, we do not fetch logs from task instance
- # local machine even if there are errors reading remote logs, as
- # remote_log will contain error message.
- remote_log = self.gcs_read(remote_loc, return_error=True)
+ try:
+ remote_log = self.gcs_read(remote_loc)
log = '*** Reading remote log from {}.\n{}\n'.format(
remote_loc, remote_log)
- else:
- log = super(GCSTaskHandler, self)._read(ti, try_number)
+ except Exception as e:
+ log = '*** Unable to read remote log from {}\n*** {}\n\n'.format(
+ remote_loc, str(e))
+ self.log.error(log)
+ log += super(GCSTaskHandler, self)._read(ti, try_number)
return log
- def gcs_log_exists(self, remote_log_location):
- """
- Check if remote_log_location exists in remote storage
- :param remote_log_location: log's location in remote storage
- :return: True if location exists else False
- """
- try:
- bkt, blob = self.parse_gcs_url(remote_log_location)
- return self.hook.exists(bkt, blob)
- except Exception:
- pass
- return False
-
- def gcs_read(self, remote_log_location, return_error=False):
+ def gcs_read(self, remote_log_location):
"""
Returns the log found at the remote_log_location.
:param remote_log_location: the log's location in remote storage
:type remote_log_location: string (path)
- :param return_error: if True, returns a string error message if an
- error occurs. Otherwise returns '' when an error occurs.
- :type return_error: bool
"""
- try:
- bkt, blob = self.parse_gcs_url(remote_log_location)
- return self.hook.download(bkt, blob).decode()
- except:
- # return error if needed
- if return_error:
- msg = 'Could not read logs from {}'.format(remote_log_location)
- self.log.error(msg)
- return msg
+ bkt, blob = self.parse_gcs_url(remote_log_location)
+ return self.hook.download(bkt, blob).decode()
def gcs_write(self, log, remote_log_location, append=True):
"""
@@ -154,7 +131,10 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
:type append: bool
"""
if append:
- old_log = self.gcs_read(remote_log_location)
+ try:
+ old_log = self.gcs_read(remote_log_location)
+ except Exception as e:
+ old_log = '*** Previous log discarded: {}\n\n'.format(str(e))
log = '\n'.join([old_log, log]) if old_log else log
try:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b41fbe/tests/www/test_views.py
----------------------------------------------------------------------
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index f5b015e..0051848 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -374,7 +374,7 @@ class TestLogView(unittest.TestCase):
follow_redirects=True,
)
self.assertEqual(response.status_code, 200)
- self.assertIn('Log file isn',
+ self.assertIn('Log file does not exist',
response.data.decode('utf-8'))