You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/10/04 09:28:39 UTC
incubator-airflow git commit: [AIRFLOW-1676] Make GCSTaskHandler
write to GCS on close
Repository: incubator-airflow
Updated Branches:
refs/heads/master bc25d593c -> 96206b0e5
[AIRFLOW-1676] Make GCSTaskHandler write to GCS on close
Closes #2659 from criccomini/AIRFLOW-1676
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/96206b0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/96206b0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/96206b0e
Branch: refs/heads/master
Commit: 96206b0e5886945bc7ac5436bc7139daf14570d9
Parents: bc25d59
Author: Chris Riccomini <cr...@apache.org>
Authored: Wed Oct 4 11:28:33 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Oct 4 11:28:33 2017 +0200
----------------------------------------------------------------------
airflow/utils/log/gcs_task_handler.py | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/96206b0e/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index dcdaf6d..bb40e11 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -31,6 +31,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
self.remote_base = gcs_log_folder
self.log_relative_path = ''
self._hook = None
+ self.closed = False
def _build_hook(self):
remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
@@ -67,7 +68,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
# calling close method. Here we check if logger is already
# closed to prevent uploading the log to remote storage multiple
# times when `logging.shutdown` is called.
- if self._hook is None:
+ if self.closed:
return
super(GCSTaskHandler, self).close()
@@ -80,8 +81,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
log = logfile.read()
self.gcs_write(log, remote_loc)
- # Unset variable
- self._hook = None
+ # Mark closed so we don't double write if close is called twice
+ self.closed = True
def _read(self, ti, try_number):
"""
@@ -153,8 +154,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
:type append: bool
"""
if append:
- old_log = self.read(remote_log_location)
- log = '\n'.join([old_log, log])
+ old_log = self.gcs_read(remote_log_location)
+ log = '\n'.join([old_log, log]) if old_log else log
try:
bkt, blob = self.parse_gcs_url(remote_log_location)