You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/11/09 20:57:10 UTC
incubator-airflow git commit: [AIRFLOW-1756] Fix S3TaskHandler to
work with Boto3-based S3Hook
Repository: incubator-airflow
Updated Branches:
refs/heads/master 28411b1e7 -> 715602ce6
[AIRFLOW-1756] Fix S3TaskHandler to work with Boto3-based S3Hook
The change from boto2 to boto3 in S3Hook caused
this to break (the
return type of `hook.get_key()` changed. There's a
better method
designed for that we should use anyway.
This wasn't caught by the tests as the mocks
weren't updated. Rather
than mocking the return of the hook I have changed
it to use "moto"
(already in use elsewhere in the tests) to mock at
the S3 layer, not
our hook.
Closes #2773 from ashb/AIRFLOW-1756-s3-logging-
boto3-fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/715602ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/715602ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/715602ce
Branch: refs/heads/master
Commit: 715602ce6a78d773ca85397cf8a0fa85afe42b74
Parents: 28411b1
Author: Ash Berlin-Taylor <as...@firemirror.com>
Authored: Thu Nov 9 21:57:04 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Nov 9 21:57:04 2017 +0100
----------------------------------------------------------------------
airflow/utils/log/s3_task_handler.py | 12 ++-
tests/utils/log/test_s3_task_handler.py | 128 +++++++++++++++------------
2 files changed, 74 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/715602ce/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
index 1e56655..cfa966a 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -127,14 +127,12 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
:type return_error: bool
"""
try:
- s3_key = self.hook.get_key(remote_log_location)
- if s3_key:
- return s3_key.get_contents_as_string().decode()
+ return self.hook.read_key(remote_log_location)
except:
+ msg = 'Could not read logs from {}'.format(remote_log_location)
+ self.log.exception(msg)
# return error if needed
if return_error:
- msg = 'Could not read logs from {}'.format(remote_log_location)
- self.log.error(msg)
return msg
def s3_write(self, log, remote_log_location, append=True):
@@ -149,7 +147,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
the new log is appended to any existing logs.
:type append: bool
"""
- if append:
+ if append and self.s3_log_exists(remote_log_location):
old_log = self.s3_read(remote_log_location)
log = '\n'.join([old_log, log]) if old_log else log
@@ -161,4 +159,4 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS'),
)
except:
- self.log.error('Could not write logs to %s', remote_log_location)
+ self.log.exception('Could not write logs to %s', remote_log_location)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/715602ce/tests/utils/log/test_s3_task_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py
index da879b6..b1354cd 100644
--- a/tests/utils/log/test_s3_task_handler.py
+++ b/tests/utils/log/test_s3_task_handler.py
@@ -12,46 +12,66 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from datetime import datetime
import mock
import unittest
+from airflow import configuration
from airflow.utils.log.s3_task_handler import S3TaskHandler
+from airflow.hooks.S3_hook import S3Hook
+from airflow.models import TaskInstance, DAG
+from airflow.operators.dummy_operator import DummyOperator
+try:
+ import boto3
+ import moto
+ from moto import mock_s3
+except ImportError:
+ mock_s3 = None
+
+@unittest.skipIf(mock_s3 is None,
+ "Skipping test because moto.mock_s3 is not available")
+@mock_s3
class TestS3TaskHandler(unittest.TestCase):
def setUp(self):
super(TestS3TaskHandler, self).setUp()
- self.remote_log_location = 'remote/log/location'
+ self.remote_log_location = 's3://bucket/remote/log/location'
+ self.remote_log_key = 'remote/log/location'
self.local_log_location = 'local/log/location'
- self.s3_log_location = 's3/log/location'
- self.filename_template = ''
- self.hook_patcher = mock.patch("airflow.hooks.S3_hook.S3Hook")
- self.hook_mock = self.hook_patcher.start()
- self.hook_inst_mock = self.hook_mock.return_value
- self.hook_key_mock = self.hook_inst_mock.get_key.return_value
- self.hook_key_mock.get_contents_as_string.return_value.decode.\
- return_value = 'content'
+ self.filename_template = '{try_number}.log'
self.s3_task_handler = S3TaskHandler(
self.local_log_location,
- self.s3_log_location,
+ self.remote_log_location,
self.filename_template
)
- def tearDown(self):
- self.hook_patcher.stop()
- super(TestS3TaskHandler, self).tearDown()
+ configuration.load_test_config()
+ date = datetime(2016, 1, 1)
+ self.dag = DAG('dag_for_testing_file_task_handler', start_date=date)
+ task = DummyOperator(task_id='task_for_testing_file_log_handler', dag=self.dag)
+ self.ti = TaskInstance(task=task, execution_date=date)
+ self.ti.try_number = 1
+ self.addCleanup(self.dag.clear)
+
+ self.conn = boto3.client('s3')
+ # We need to create the bucket since this is all in Moto's 'virtual'
+ # AWS account
+ moto.core.moto_api_backend.reset()
+ self.conn.create_bucket(Bucket="bucket")
- def test_init(self):
- self.s3_task_handler.hook()
- self.hook_mock.assert_called_once_with('')
+ def test_hook(self):
+ self.assertIsInstance(self.s3_task_handler.hook, S3Hook)
- def test_init_raises(self):
- self.hook_mock.side_effect = Exception('Failed to connect')
+ def test_hook_raises(self):
handler = self.s3_task_handler
with mock.patch.object(handler.log, 'error') as mock_error:
- # Initialize the hook
- handler.hook
+ with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook:
+ mock_hook.side_effect = Exception('Failed to connect')
+ # Initialize the hook
+ handler.hook
+
mock_error.assert_called_once_with(
'Could not create an S3Hook with connection id "%s". Please make '
'sure that airflow[s3] is installed and the S3 connection exists.',
@@ -59,66 +79,56 @@ class TestS3TaskHandler(unittest.TestCase):
)
def test_log_exists(self):
+ self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'')
self.assertTrue(self.s3_task_handler.s3_log_exists(self.remote_log_location))
def test_log_exists_none(self):
- self.hook_inst_mock.get_key.return_value = None
self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
def test_log_exists_raises(self):
- self.hook_inst_mock.get_key.side_effect = Exception('error')
- self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
-
- def test_log_exists_false(self):
- self.hook_inst_mock.get_key.return_value = None
- self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
+ self.assertFalse(self.s3_task_handler.s3_log_exists('s3://nonexistentbucket/foo'))
def test_log_exists_no_hook(self):
- self.hook_mock.side_effect = Exception('Failed to connect')
- self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
+ with mock.patch("airflow.hooks.S3_hook.S3Hook") as mock_hook:
+ mock_hook.side_effect = Exception('Failed to connect')
+ self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
def test_read(self):
+ self.conn.put_object(Bucket='bucket', Key='remote/log/location/1.log', Body=b'Log line\n')
self.assertEqual(
- self.s3_task_handler.s3_read(self.remote_log_location),
- 'content'
+ self.s3_task_handler.read(self.ti),
+ ['*** Reading remote log from s3://bucket/remote/log/location/1.log.\nLog line\n\n']
)
- def test_read_key_empty(self):
- self.hook_inst_mock.get_key.return_value = None
- self.assertEqual(self.s3_task_handler.s3_read(self.remote_log_location), None)
-
- def test_read_raises(self):
- self.hook_inst_mock.get_key.side_effect = Exception('error')
- self.assertEqual(self.s3_task_handler.s3_read(self.remote_log_location), None)
-
def test_read_raises_return_error(self):
- self.hook_inst_mock.get_key.side_effect = Exception('error')
handler = self.s3_task_handler
+ url = 's3://nonexistentbucket/foo'
with mock.patch.object(handler.log, 'error') as mock_error:
- result = handler.s3_read(
- self.remote_log_location,
- return_error=True
- )
- msg = 'Could not read logs from %s' % self.remote_log_location
+ result = handler.s3_read(url, return_error=True)
+ msg = 'Could not read logs from %s' % url
self.assertEqual(result, msg)
- mock_error.assert_called_once_with(msg)
+ mock_error.assert_called_once_with(msg, exc_info=True)
def test_write(self):
+ with mock.patch.object(self.s3_task_handler.log, 'error') as mock_error:
+ self.s3_task_handler.s3_write('text', self.remote_log_location)
+ # We shouldn't expect any error logs in the default working case.
+ mock_error.assert_not_called()
+ body = boto3.resource('s3').Object('bucket', self.remote_log_key).get()['Body'].read()
+
+ self.assertEqual(body, b'text')
+
+ def test_write_existing(self):
+ self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'previous ')
self.s3_task_handler.s3_write('text', self.remote_log_location)
- self.hook_inst_mock.load_string.assert_called_once_with(
- 'content\ntext',
- key=self.remote_log_location,
- replace=True,
- encrypt=False,
- )
+ body = boto3.resource('s3').Object('bucket', self.remote_log_key).get()['Body'].read()
+
+ self.assertEqual(body, b'previous \ntext')
def test_write_raises(self):
- self.hook_inst_mock.read_key.return_value = ''
- self.hook_inst_mock.load_string.side_effect = Exception('error')
handler = self.s3_task_handler
+ url = 's3://nonexistentbucket/foo'
with mock.patch.object(handler.log, 'error') as mock_error:
- handler.s3_write('text', self.remote_log_location)
- mock_error.assert_called_once_with(
- 'Could not write logs to %s',
- 'remote/log/location'
- )
+ handler.s3_write('text', url)
+ self.assertEqual
+ mock_error.assert_called_once_with('Could not write logs to %s', url, exc_info=True)