You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/10/04 09:28:39 UTC

incubator-airflow git commit: [AIRFLOW-1676] Make GCSTaskHandler write to GCS on close

Repository: incubator-airflow
Updated Branches:
  refs/heads/master bc25d593c -> 96206b0e5


[AIRFLOW-1676] Make GCSTaskHandler write to GCS on close

Closes #2659 from criccomini/AIRFLOW-1676


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/96206b0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/96206b0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/96206b0e

Branch: refs/heads/master
Commit: 96206b0e5886945bc7ac5436bc7139daf14570d9
Parents: bc25d59
Author: Chris Riccomini <cr...@apache.org>
Authored: Wed Oct 4 11:28:33 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Oct 4 11:28:33 2017 +0200

----------------------------------------------------------------------
 airflow/utils/log/gcs_task_handler.py | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/96206b0e/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index dcdaf6d..bb40e11 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -31,6 +31,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         self.remote_base = gcs_log_folder
         self.log_relative_path = ''
         self._hook = None
+        self.closed = False
 
     def _build_hook(self):
         remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
@@ -67,7 +68,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         # calling close method. Here we check if logger is already
         # closed to prevent uploading the log to remote storage multiple
         # times when `logging.shutdown` is called.
-        if self._hook is None:
+        if self.closed:
             return
 
         super(GCSTaskHandler, self).close()
@@ -80,8 +81,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
                 log = logfile.read()
             self.gcs_write(log, remote_loc)
 
-        # Unset variable
-        self._hook = None
+        # Mark closed so we don't double write if close is called twice
+        self.closed = True
 
     def _read(self, ti, try_number):
         """
@@ -153,8 +154,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         :type append: bool
         """
         if append:
-            old_log = self.read(remote_log_location)
-            log = '\n'.join([old_log, log])
+            old_log = self.gcs_read(remote_log_location)
+            log = '\n'.join([old_log, log]) if old_log else log
 
         try:
             bkt, blob = self.parse_gcs_url(remote_log_location)