You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/11/09 20:57:10 UTC

incubator-airflow git commit: [AIRFLOW-1756] Fix S3TaskHandler to work with Boto3-based S3Hook

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 28411b1e7 -> 715602ce6


[AIRFLOW-1756] Fix S3TaskHandler to work with Boto3-based S3Hook

The change from boto2 to boto3 in S3Hook caused
this to break (the
return type of `hook.get_key()` changed. There's a
better method
designed for that we should use anyway.

This wasn't caught by the tests as the mocks
weren't updated. Rather
than mocking the return of the hook I have changed
it to use "moto"
(already in use elsewhere in the tests) to mock at
the S3 layer, not
our hook.

Closes #2773 from ashb/AIRFLOW-1756-s3-logging-
boto3-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/715602ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/715602ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/715602ce

Branch: refs/heads/master
Commit: 715602ce6a78d773ca85397cf8a0fa85afe42b74
Parents: 28411b1
Author: Ash Berlin-Taylor <as...@firemirror.com>
Authored: Thu Nov 9 21:57:04 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Nov 9 21:57:04 2017 +0100

----------------------------------------------------------------------
 airflow/utils/log/s3_task_handler.py    |  12 ++-
 tests/utils/log/test_s3_task_handler.py | 128 +++++++++++++++------------
 2 files changed, 74 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/715602ce/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
index 1e56655..cfa966a 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -127,14 +127,12 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
         :type return_error: bool
         """
         try:
-            s3_key = self.hook.get_key(remote_log_location)
-            if s3_key:
-                return s3_key.get_contents_as_string().decode()
+            return self.hook.read_key(remote_log_location)
         except:
+            msg = 'Could not read logs from {}'.format(remote_log_location)
+            self.log.exception(msg)
             # return error if needed
             if return_error:
-                msg = 'Could not read logs from {}'.format(remote_log_location)
-                self.log.error(msg)
                 return msg
 
     def s3_write(self, log, remote_log_location, append=True):
@@ -149,7 +147,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
             the new log is appended to any existing logs.
         :type append: bool
         """
-        if append:
+        if append and self.s3_log_exists(remote_log_location):
             old_log = self.s3_read(remote_log_location)
             log = '\n'.join([old_log, log]) if old_log else log
 
@@ -161,4 +159,4 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
                 encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS'),
             )
         except:
-            self.log.error('Could not write logs to %s', remote_log_location)
+            self.log.exception('Could not write logs to %s', remote_log_location)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/715602ce/tests/utils/log/test_s3_task_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py
index da879b6..b1354cd 100644
--- a/tests/utils/log/test_s3_task_handler.py
+++ b/tests/utils/log/test_s3_task_handler.py
@@ -12,46 +12,66 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from datetime import datetime
 import mock
 import unittest
 
+from airflow import configuration
 from airflow.utils.log.s3_task_handler import S3TaskHandler
+from airflow.hooks.S3_hook import S3Hook
+from airflow.models import TaskInstance, DAG
+from airflow.operators.dummy_operator import DummyOperator
 
+try:
+    import boto3
+    import moto
+    from moto import mock_s3
+except ImportError:
+    mock_s3 = None
 
+
+@unittest.skipIf(mock_s3 is None,
+                 "Skipping test because moto.mock_s3 is not available")
+@mock_s3
 class TestS3TaskHandler(unittest.TestCase):
 
     def setUp(self):
         super(TestS3TaskHandler, self).setUp()
-        self.remote_log_location = 'remote/log/location'
+        self.remote_log_location = 's3://bucket/remote/log/location'
+        self.remote_log_key = 'remote/log/location'
         self.local_log_location = 'local/log/location'
-        self.s3_log_location = 's3/log/location'
-        self.filename_template = ''
-        self.hook_patcher = mock.patch("airflow.hooks.S3_hook.S3Hook")
-        self.hook_mock = self.hook_patcher.start()
-        self.hook_inst_mock = self.hook_mock.return_value
-        self.hook_key_mock = self.hook_inst_mock.get_key.return_value
-        self.hook_key_mock.get_contents_as_string.return_value.decode.\
-            return_value = 'content'
+        self.filename_template = '{try_number}.log'
         self.s3_task_handler = S3TaskHandler(
             self.local_log_location,
-            self.s3_log_location,
+            self.remote_log_location,
             self.filename_template
         )
 
-    def tearDown(self):
-        self.hook_patcher.stop()
-        super(TestS3TaskHandler, self).tearDown()
+        configuration.load_test_config()
+        date = datetime(2016, 1, 1)
+        self.dag = DAG('dag_for_testing_file_task_handler', start_date=date)
+        task = DummyOperator(task_id='task_for_testing_file_log_handler', dag=self.dag)
+        self.ti = TaskInstance(task=task, execution_date=date)
+        self.ti.try_number = 1
+        self.addCleanup(self.dag.clear)
+
+        self.conn = boto3.client('s3')
+        # We need to create the bucket since this is all in Moto's 'virtual'
+        # AWS account
+        moto.core.moto_api_backend.reset()
+        self.conn.create_bucket(Bucket="bucket")
 
-    def test_init(self):
-        self.s3_task_handler.hook()
-        self.hook_mock.assert_called_once_with('')
+    def test_hook(self):
+        self.assertIsInstance(self.s3_task_handler.hook, S3Hook)
 
-    def test_init_raises(self):
-        self.hook_mock.side_effect = Exception('Failed to connect')
+    def test_hook_raises(self):
         handler = self.s3_task_handler
         with mock.patch.object(handler.log, 'error') as mock_error:
-            # Initialize the hook
-            handler.hook
+            with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook:
+                mock_hook.side_effect = Exception('Failed to connect')
+                # Initialize the hook
+                handler.hook
+
             mock_error.assert_called_once_with(
                 'Could not create an S3Hook with connection id "%s". Please make '
                 'sure that airflow[s3] is installed and the S3 connection exists.',
@@ -59,66 +79,56 @@ class TestS3TaskHandler(unittest.TestCase):
             )
 
     def test_log_exists(self):
+        self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'')
         self.assertTrue(self.s3_task_handler.s3_log_exists(self.remote_log_location))
 
     def test_log_exists_none(self):
-        self.hook_inst_mock.get_key.return_value = None
         self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
 
     def test_log_exists_raises(self):
-        self.hook_inst_mock.get_key.side_effect = Exception('error')
-        self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
-
-    def test_log_exists_false(self):
-        self.hook_inst_mock.get_key.return_value = None
-        self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
+        self.assertFalse(self.s3_task_handler.s3_log_exists('s3://nonexistentbucket/foo'))
 
     def test_log_exists_no_hook(self):
-        self.hook_mock.side_effect = Exception('Failed to connect')
-        self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
+        with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook:
+            mock_hook.side_effect = Exception('Failed to connect')
+            self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
 
     def test_read(self):
+        self.conn.put_object(Bucket='bucket', Key='remote/log/location/1.log', Body=b'Log line\n')
         self.assertEqual(
-            self.s3_task_handler.s3_read(self.remote_log_location),
-            'content'
+            self.s3_task_handler.read(self.ti),
+            ['*** Reading remote log from s3://bucket/remote/log/location/1.log.\nLog line\n\n']
         )
 
-    def test_read_key_empty(self):
-        self.hook_inst_mock.get_key.return_value = None
-        self.assertEqual(self.s3_task_handler.s3_read(self.remote_log_location), None)
-
-    def test_read_raises(self):
-        self.hook_inst_mock.get_key.side_effect = Exception('error')
-        self.assertEqual(self.s3_task_handler.s3_read(self.remote_log_location), None)
-
     def test_read_raises_return_error(self):
-        self.hook_inst_mock.get_key.side_effect = Exception('error')
         handler = self.s3_task_handler
+        url = 's3://nonexistentbucket/foo'
         with mock.patch.object(handler.log, 'error') as mock_error:
-            result = handler.s3_read(
-                self.remote_log_location,
-                return_error=True
-            )
-            msg = 'Could not read logs from %s' % self.remote_log_location
+            result = handler.s3_read(url, return_error=True)
+            msg = 'Could not read logs from %s' % url
             self.assertEqual(result, msg)
-            mock_error.assert_called_once_with(msg)
+            mock_error.assert_called_once_with(msg, exc_info=True)
 
     def test_write(self):
+        with mock.patch.object(self.s3_task_handler.log, 'error') as mock_error:
+            self.s3_task_handler.s3_write('text', self.remote_log_location)
+            # We shouldn't expect any error logs in the default working case.
+            mock_error.assert_not_called()
+        body = boto3.resource('s3').Object('bucket', self.remote_log_key).get()['Body'].read()
+
+        self.assertEqual(body, b'text')
+
+    def test_write_existing(self):
+        self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'previous ')
         self.s3_task_handler.s3_write('text', self.remote_log_location)
-        self.hook_inst_mock.load_string.assert_called_once_with(
-            'content\ntext',
-            key=self.remote_log_location,
-            replace=True,
-            encrypt=False,
-        )
+        body = boto3.resource('s3').Object('bucket', self.remote_log_key).get()['Body'].read()
+
+        self.assertEqual(body, b'previous \ntext')
 
     def test_write_raises(self):
-        self.hook_inst_mock.read_key.return_value = ''
-        self.hook_inst_mock.load_string.side_effect = Exception('error')
         handler = self.s3_task_handler
+        url = 's3://nonexistentbucket/foo'
         with mock.patch.object(handler.log, 'error') as mock_error:
-            handler.s3_write('text', self.remote_log_location)
-            mock_error.assert_called_once_with(
-                'Could not write logs to %s',
-                'remote/log/location'
-            )
+            handler.s3_write('text', url)
+            self.assertEqual
+            mock_error.assert_called_once_with('Could not write logs to %s', url, exc_info=True)