You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/12/19 20:04:25 UTC

incubator-airflow git commit: [AIRFLOW-1916] Don't upload logs to remote from `run --raw`

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 81558f3d0 -> 4ce4faaea


[AIRFLOW-1916] Don't upload logs to remote from `run --raw`

In a previous change we removed the
airflow.task.raw handler (which
printed to stdout directly) and replaced it with
one that wrote to the
log file itself. The problem comes that python
automatically calls
`logging.shutdown()` itself on process clean exit.
This ended up
uploading the log file twice: once from the end of
`airflow run --raw`,
and then again from the explicit shutdown() call
at the end of cli's
`run()`

Since logging is automatically shutdown this
change adds and explicit
flag to control if the GC and S3 handlers should
upload the file or not,
and we tell them not to when running with `--raw`

Closes #2880 from ashb/AIRFLOW-1916-dont-upload-
logs-twice


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4ce4faae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4ce4faae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4ce4faae

Branch: refs/heads/master
Commit: 4ce4faaeae7a76d97defcf9a9d3304ac9d78b9bd
Parents: 81558f3
Author: Ash Berlin-Taylor <as...@firemirror.com>
Authored: Tue Dec 19 21:04:18 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Dec 19 21:04:18 2017 +0100

----------------------------------------------------------------------
 airflow/bin/cli.py                      |  7 ++---
 airflow/models.py                       |  6 +++-
 airflow/utils/log/gcs_task_handler.py   |  5 ++++
 airflow/utils/log/s3_task_handler.py    |  5 ++++
 tests/utils/log/test_s3_task_handler.py | 42 ++++++++++++++++++++++++----
 5 files changed, 53 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ce4faae/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 3e954dc..e98838d 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -362,7 +362,8 @@ def run(args, dag=None):
     task = dag.get_task(task_id=args.task_id)
     ti = TaskInstance(task, args.execution_date)
     ti.refresh_from_db()
-    ti.init_run_context()
+
+    ti.init_run_context(raw=args.raw)
 
     hostname = socket.getfqdn()
     log.info("Running %s on host %s", ti, hostname)
@@ -419,10 +420,6 @@ def run(args, dag=None):
             executor.heartbeat()
             executor.end()
 
-    # Child processes should not flush or upload to remote
-    if args.raw:
-        return
-
     logging.shutdown()
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ce4faae/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index d69bc57..b2f3bac 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -807,6 +807,9 @@ class TaskInstance(Base, LoggingMixin):
         self.hostname = ''
         self.init_on_load()
         self._log = logging.getLogger("airflow.task")
+        # Is this TaskInstance being currently running within `airflow run --raw`.
+        # Not persisted to the database so only valid for the current process
+        self.is_raw = False
 
     @reconstructor
     def init_on_load(self):
@@ -1879,11 +1882,12 @@ class TaskInstance(Base, LoggingMixin):
             TI.state == State.RUNNING
         ).count()
 
-    def init_run_context(self):
+    def init_run_context(self, raw=False):
         """
         Sets the log context.
         """
         self._set_context(self)
+        self.raw = raw
 
 
 class TaskFail(Base):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ce4faae/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index a87d1d4..f68165f 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -32,6 +32,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         self.log_relative_path = ''
         self._hook = None
         self.closed = False
+        self.upload_on_close = True
 
     def _build_hook(self):
         remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
@@ -59,6 +60,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         # log path to upload log files into GCS and read from the
         # remote location.
         self.log_relative_path = self._render_filename(ti, ti.try_number)
+        self.upload_on_close = not ti.is_raw
 
     def close(self):
         """
@@ -73,6 +75,9 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
 
         super(GCSTaskHandler, self).close()
 
+        if not self.upload_on_close:
+            return
+
         local_loc = os.path.join(self.local_base, self.log_relative_path)
         remote_loc = os.path.join(self.remote_base, self.log_relative_path)
         if os.path.exists(local_loc):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ce4faae/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
index 5ff90c6..b3acf3a 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -30,6 +30,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
         self.log_relative_path = ''
         self._hook = None
         self.closed = False
+        self.upload_on_close = True
 
     def _build_hook(self):
         remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
@@ -54,6 +55,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
         # Local location and remote location is needed to open and
         # upload local log file to S3 remote storage.
         self.log_relative_path = self._render_filename(ti, ti.try_number)
+        self.upload_on_close = not ti.is_raw
 
     def close(self):
         """
@@ -68,6 +70,9 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
 
         super(S3TaskHandler, self).close()
 
+        if not self.upload_on_close:
+            return
+
         local_loc = os.path.join(self.local_base, self.log_relative_path)
         remote_loc = os.path.join(self.remote_base, self.log_relative_path)
         if os.path.exists(local_loc):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ce4faae/tests/utils/log/test_s3_task_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py
index dc32b5a..53c1e36 100644
--- a/tests/utils/log/test_s3_task_handler.py
+++ b/tests/utils/log/test_s3_task_handler.py
@@ -14,9 +14,11 @@
 
 import mock
 import unittest
+import os
 
 from airflow import configuration
 from airflow.utils.log.s3_task_handler import S3TaskHandler
+from airflow.utils.state import State
 from airflow.utils.timezone import datetime
 from airflow.hooks.S3_hook import S3Hook
 from airflow.models import TaskInstance, DAG
@@ -37,13 +39,14 @@ class TestS3TaskHandler(unittest.TestCase):
 
     def setUp(self):
         super(TestS3TaskHandler, self).setUp()
-        self.remote_log_location = 's3://bucket/remote/log/location'
-        self.remote_log_key = 'remote/log/location'
+        self.remote_log_base = 's3://bucket/remote/log/location'
+        self.remote_log_location = 's3://bucket/remote/log/location/1.log'
+        self.remote_log_key = 'remote/log/location/1.log'
         self.local_log_location = 'local/log/location'
         self.filename_template = '{try_number}.log'
         self.s3_task_handler = S3TaskHandler(
             self.local_log_location,
-            self.remote_log_location,
+            self.remote_log_base,
             self.filename_template
         )
 
@@ -53,6 +56,7 @@ class TestS3TaskHandler(unittest.TestCase):
         task = DummyOperator(task_id='task_for_testing_file_log_handler', dag=self.dag)
         self.ti = TaskInstance(task=task, execution_date=date)
         self.ti.try_number = 1
+        self.ti.state = State.RUNNING
         self.addCleanup(self.dag.clear)
 
         self.conn = boto3.client('s3')
@@ -61,6 +65,13 @@ class TestS3TaskHandler(unittest.TestCase):
         moto.core.moto_api_backend.reset()
         self.conn.create_bucket(Bucket="bucket")
 
+    def tearDown(self):
+        if self.s3_task_handler.handler:
+            try:
+                os.remove(self.s3_task_handler.handler.baseFilename)
+            except Exception:
+                pass
+
     def test_hook(self):
         self.assertIsInstance(self.s3_task_handler.hook, S3Hook)
 
@@ -94,10 +105,11 @@ class TestS3TaskHandler(unittest.TestCase):
             self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
 
     def test_read(self):
-        self.conn.put_object(Bucket='bucket', Key='remote/log/location/1.log', Body=b'Log line\n')
+        self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'Log line\n')
         self.assertEqual(
             self.s3_task_handler.read(self.ti),
-            ['*** Reading remote log from s3://bucket/remote/log/location/1.log.\nLog line\n\n']
+            ['*** Reading remote log from s3://bucket/remote/log/location/1.log.\n'
+             'Log line\n\n']
         )
 
     def test_read_raises_return_error(self):
@@ -131,4 +143,22 @@ class TestS3TaskHandler(unittest.TestCase):
         with mock.patch.object(handler.log, 'error') as mock_error:
             handler.s3_write('text', url)
             self.assertEqual
-            mock_error.assert_called_once_with('Could not write logs to %s', url, exc_info=True)
+            mock_error.assert_called_once_with(
+                'Could not write logs to %s', url, exc_info=True)
+
+    def test_close(self):
+        self.s3_task_handler.set_context(self.ti)
+        self.assertTrue(self.s3_task_handler.upload_on_close)
+
+        self.s3_task_handler.close()
+        # Should not raise
+        boto3.resource('s3').Object('bucket', self.remote_log_key).get()
+
+    def test_close_no_upload(self):
+        self.ti.is_raw = True
+        self.s3_task_handler.set_context(self.ti)
+        self.assertFalse(self.s3_task_handler.upload_on_close)
+        self.s3_task_handler.close()
+
+        with self.assertRaises(self.conn.exceptions.NoSuchKey):
+            boto3.resource('s3').Object('bucket', self.remote_log_key).get()