You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/04 10:35:55 UTC
[airflow] branch main updated: Apply flake8-logging-format changes to tests (#24931)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dd06797c10 Apply flake8-logging-format changes to tests (#24931)
dd06797c10 is described below
commit dd06797c1016c48d8681ff218186a30e99c7c5c4
Author: Josh Fell <48...@users.noreply.github.com>
AuthorDate: Thu Aug 4 06:35:45 2022 -0400
Apply flake8-logging-format changes to tests (#24931)
---
tests/models/test_dagbag.py | 2 +-
.../apache/hive/transfers/test_s3_to_hive.py | 104 ++++++++++-----------
.../google/cloud/utils/gcp_authenticator.py | 20 +---
.../task/task_runner/test_standard_task_runner.py | 2 +-
tests/utils/test_compression.py | 47 ++++------
5 files changed, 76 insertions(+), 99 deletions(-)
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index c37cc55b38..7bb371ef9d 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -436,7 +436,7 @@ class TestDagBag:
actual_found_dag_ids = list(map(lambda dag: dag.dag_id, actual_found_dags))
for dag_id in expected_dag_ids:
- actual_dagbag.log.info(f'validating {dag_id}')
+ actual_dagbag.log.info('validating %s', dag_id)
assert (dag_id in actual_found_dag_ids) == should_be_found, (
f"dag \"{dag_id}\" should {'' if should_be_found else 'not '}"
f"have been found after processing dag \"{expected_parent_dag.dag_id}\""
diff --git a/tests/providers/apache/hive/transfers/test_s3_to_hive.py b/tests/providers/apache/hive/transfers/test_s3_to_hive.py
index 4ca6f39a67..26d7430fdb 100644
--- a/tests/providers/apache/hive/transfers/test_s3_to_hive.py
+++ b/tests/providers/apache/hive/transfers/test_s3_to_hive.py
@@ -21,7 +21,6 @@ import errno
import filecmp
import logging
import shutil
-import unittest
from collections import OrderedDict
from gzip import GzipFile
from itertools import product
@@ -40,8 +39,9 @@ except ImportError:
mock_s3 = None
-class TestS3ToHiveTransfer(unittest.TestCase):
- def setUp(self):
+class TestS3ToHiveTransfer:
+ @pytest.fixture(autouse=True)
+ def setup_attrs(self):
self.file_names = {}
self.task_id = 'S3ToHiveTransferTest'
self.s3_key = 'S32hive_test_file'
@@ -69,49 +69,45 @@ class TestS3ToHiveTransfer(unittest.TestCase):
'wildcard_match': self.wildcard_match,
'input_compressed': self.input_compressed,
}
- try:
- header = b"Sno\tSome,Text \n"
- line1 = b"1\tAirflow Test\n"
- line2 = b"2\tS32HiveTransfer\n"
- self.tmp_dir = mkdtemp(prefix='test_tmps32hive_')
- # create sample txt, gz and bz2 with and without headers
- with NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt_h:
- self._set_fn(f_txt_h.name, '.txt', True)
- f_txt_h.writelines([header, line1, line2])
- fn_gz = self._get_fn('.txt', True) + ".gz"
- with GzipFile(filename=fn_gz, mode="wb") as f_gz_h:
- self._set_fn(fn_gz, '.gz', True)
- f_gz_h.writelines([header, line1, line2])
- fn_gz_upper = self._get_fn('.txt', True) + ".GZ"
- with GzipFile(filename=fn_gz_upper, mode="wb") as f_gz_upper_h:
- self._set_fn(fn_gz_upper, '.GZ', True)
- f_gz_upper_h.writelines([header, line1, line2])
- fn_bz2 = self._get_fn('.txt', True) + '.bz2'
- with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2_h:
- self._set_fn(fn_bz2, '.bz2', True)
- f_bz2_h.writelines([header, line1, line2])
- # create sample txt, bz and bz2 without header
- with NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt_nh:
- self._set_fn(f_txt_nh.name, '.txt', False)
- f_txt_nh.writelines([line1, line2])
- fn_gz = self._get_fn('.txt', False) + ".gz"
- with GzipFile(filename=fn_gz, mode="wb") as f_gz_nh:
- self._set_fn(fn_gz, '.gz', False)
- f_gz_nh.writelines([line1, line2])
- fn_gz_upper = self._get_fn('.txt', False) + ".GZ"
- with GzipFile(filename=fn_gz_upper, mode="wb") as f_gz_upper_nh:
- self._set_fn(fn_gz_upper, '.GZ', False)
- f_gz_upper_nh.writelines([line1, line2])
- fn_bz2 = self._get_fn('.txt', False) + '.bz2'
- with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2_nh:
- self._set_fn(fn_bz2, '.bz2', False)
- f_bz2_nh.writelines([line1, line2])
- # Base Exception so it catches Keyboard Interrupt
- except BaseException as e:
- logging.error(e)
- self.tearDown()
+ header = b"Sno\tSome,Text \n"
+ line1 = b"1\tAirflow Test\n"
+ line2 = b"2\tS32HiveTransfer\n"
+ self.tmp_dir = mkdtemp(prefix='test_tmps32hive_')
+ # create sample txt, gz and bz2 with and without headers
+ with NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt_h:
+ self._set_fn(f_txt_h.name, '.txt', True)
+ f_txt_h.writelines([header, line1, line2])
+ fn_gz = self._get_fn('.txt', True) + ".gz"
+ with GzipFile(filename=fn_gz, mode="wb") as f_gz_h:
+ self._set_fn(fn_gz, '.gz', True)
+ f_gz_h.writelines([header, line1, line2])
+ fn_gz_upper = self._get_fn('.txt', True) + ".GZ"
+ with GzipFile(filename=fn_gz_upper, mode="wb") as f_gz_upper_h:
+ self._set_fn(fn_gz_upper, '.GZ', True)
+ f_gz_upper_h.writelines([header, line1, line2])
+ fn_bz2 = self._get_fn('.txt', True) + '.bz2'
+ with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2_h:
+ self._set_fn(fn_bz2, '.bz2', True)
+ f_bz2_h.writelines([header, line1, line2])
+ # create sample txt, bz and bz2 without header
+ with NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt_nh:
+ self._set_fn(f_txt_nh.name, '.txt', False)
+ f_txt_nh.writelines([line1, line2])
+ fn_gz = self._get_fn('.txt', False) + ".gz"
+ with GzipFile(filename=fn_gz, mode="wb") as f_gz_nh:
+ self._set_fn(fn_gz, '.gz', False)
+ f_gz_nh.writelines([line1, line2])
+ fn_gz_upper = self._get_fn('.txt', False) + ".GZ"
+ with GzipFile(filename=fn_gz_upper, mode="wb") as f_gz_upper_nh:
+ self._set_fn(fn_gz_upper, '.GZ', False)
+ f_gz_upper_nh.writelines([line1, line2])
+ fn_bz2 = self._get_fn('.txt', False) + '.bz2'
+ with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2_nh:
+ self._set_fn(fn_bz2, '.bz2', False)
+ f_bz2_nh.writelines([line1, line2])
+
+ yield
- def tearDown(self):
try:
shutil.rmtree(self.tmp_dir)
except OSError as e:
@@ -152,6 +148,11 @@ class TestS3ToHiveTransfer(unittest.TestCase):
else:
return filecmp.cmp(fn_1, fn_2, shallow=False)
+ @staticmethod
+ def _load_file_side_effect(args, op_fn, ext):
+ check = TestS3ToHiveTransfer._check_file_equality(args[0], op_fn, ext)
+ assert check, f'{ext} output file not as expected'
+
def test_bad_parameters(self):
self.kwargs['check_headers'] = True
self.kwargs['headers'] = False
@@ -194,8 +195,8 @@ class TestS3ToHiveTransfer(unittest.TestCase):
fn_bz2 = self._get_fn('.bz2', False)
assert self._check_file_equality(bz2_txt_nh, fn_bz2, '.bz2'), "bz2 Compressed file not as expected"
- @unittest.skipIf(mock is None, 'mock package not present')
- @unittest.skipIf(mock_s3 is None, 'moto package not present')
+ @pytest.mark.skipif(mock is None, reason='mock package not present')
+ @pytest.mark.skipif(mock_s3 is None, reason='moto package not present')
@mock.patch('airflow.providers.apache.hive.transfers.s3_to_hive.HiveCliHook')
@mock_s3
def test_execute(self, mock_hiveclihook):
@@ -217,16 +218,15 @@ class TestS3ToHiveTransfer(unittest.TestCase):
# file parameter to HiveCliHook.load_file is compared
# against expected file output
- mock_hiveclihook().load_file.side_effect = lambda *args, **kwargs: self.assertTrue(
- self._check_file_equality(args[0], op_fn, ext),
- f'{ext} output file not as expected',
+ mock_hiveclihook().load_file.side_effect = lambda *args, **kwargs: self._load_file_side_effect(
+ args, op_fn, ext
)
# Execute S3ToHiveTransfer
s32hive = S3ToHiveOperator(**self.kwargs)
s32hive.execute(None)
- @unittest.skipIf(mock is None, 'mock package not present')
- @unittest.skipIf(mock_s3 is None, 'moto package not present')
+ @pytest.mark.skipif(mock is None, reason='mock package not present')
+ @pytest.mark.skipif(mock_s3 is None, reason='moto package not present')
@mock.patch('airflow.providers.apache.hive.transfers.s3_to_hive.HiveCliHook')
@mock_s3
def test_execute_with_select_expression(self, mock_hiveclihook):
diff --git a/tests/providers/google/cloud/utils/gcp_authenticator.py b/tests/providers/google/cloud/utils/gcp_authenticator.py
index e5de8ee3d6..044b962a33 100644
--- a/tests/providers/google/cloud/utils/gcp_authenticator.py
+++ b/tests/providers/google/cloud/utils/gcp_authenticator.py
@@ -96,8 +96,7 @@ class GcpAuthenticator(CommandExecutor):
key path
:return: None
"""
- session = settings.Session()
- try:
+ with settings.Session() as session:
conn = session.query(Connection).filter(Connection.conn_id == 'google_cloud_default')[0]
extras = conn.extra_dejson
extras[KEYPATH_EXTRA] = self.full_key_path
@@ -106,13 +105,6 @@ class GcpAuthenticator(CommandExecutor):
extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform'
extras[PROJECT_EXTRA] = self.project_extra if self.project_extra else self.project_id
conn.extra = json.dumps(extras)
- session.commit()
- except BaseException as ex:
- self.log.error('Airflow DB Session error: %s', str(ex))
- session.rollback()
- raise
- finally:
- session.close()
def set_dictionary_in_airflow_connection(self):
"""
@@ -120,8 +112,7 @@ class GcpAuthenticator(CommandExecutor):
of the json service account file.
:return: None
"""
- session = settings.Session()
- try:
+ with settings.Session() as session:
conn = session.query(Connection).filter(Connection.conn_id == 'google_cloud_default')[0]
extras = conn.extra_dejson
with open(self.full_key_path) as path_file:
@@ -132,13 +123,6 @@ class GcpAuthenticator(CommandExecutor):
extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform'
extras[PROJECT_EXTRA] = self.project_extra
conn.extra = json.dumps(extras)
- session.commit()
- except BaseException as ex:
- self.log.error('Airflow DB Session error: %s', str(ex))
- session.rollback()
- raise
- finally:
- session.close()
def _set_key_path(self):
"""
diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py
index 2ed266a543..a726401a8b 100644
--- a/tests/task/task_runner/test_standard_task_runner.py
+++ b/tests/task/task_runner/test_standard_task_runner.py
@@ -232,7 +232,7 @@ class TestStandardTaskRunner:
time.sleep(0.01)
logging.info("Task started. Give the task some time to settle")
time.sleep(3)
- logging.info(f"Terminating processes {processes} belonging to {runner_pgid} group")
+ logging.info("Terminating processes %s belonging to %s group", processes, runner_pgid)
runner.terminate()
session.close() # explicitly close as `create_session`s commit will blow up otherwise
diff --git a/tests/utils/test_compression.py b/tests/utils/test_compression.py
index cfa84f5f03..f8e01bda25 100644
--- a/tests/utils/test_compression.py
+++ b/tests/utils/test_compression.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -20,45 +19,39 @@ import bz2
import errno
import filecmp
import gzip
-import logging
import shutil
import tempfile
-import unittest
import pytest
from airflow.utils import compression
-class TestCompression(unittest.TestCase):
- def setUp(self):
+class TestCompression:
+ @pytest.fixture(autouse=True)
+ def setup_attrs(self):
self.file_names = {}
- try:
- header = b"Sno\tSome,Text \n"
- line1 = b"1\tAirflow Test\n"
- line2 = b"2\tCompressionUtil\n"
- self.tmp_dir = tempfile.mkdtemp(prefix='test_utils_compression_')
- # create sample txt, gz and bz2 files
- with tempfile.NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt:
- self._set_fn(f_txt.name, '.txt')
- f_txt.writelines([header, line1, line2])
+ header = b"Sno\tSome,Text \n"
+ line1 = b"1\tAirflow Test\n"
+ line2 = b"2\tCompressionUtil\n"
+ self.tmp_dir = tempfile.mkdtemp(prefix='test_utils_compression_')
+ # create sample txt, gz and bz2 files
+ with tempfile.NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt:
+ self._set_fn(f_txt.name, '.txt')
+ f_txt.writelines([header, line1, line2])
- fn_gz = self._get_fn('.txt') + ".gz"
- with gzip.GzipFile(filename=fn_gz, mode="wb") as f_gz:
- self._set_fn(fn_gz, '.gz')
- f_gz.writelines([header, line1, line2])
+ fn_gz = self._get_fn('.txt') + ".gz"
+ with gzip.GzipFile(filename=fn_gz, mode="wb") as f_gz:
+ self._set_fn(fn_gz, '.gz')
+ f_gz.writelines([header, line1, line2])
- fn_bz2 = self._get_fn('.txt') + '.bz2'
- with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2:
- self._set_fn(fn_bz2, '.bz2')
- f_bz2.writelines([header, line1, line2])
+ fn_bz2 = self._get_fn('.txt') + '.bz2'
+ with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2:
+ self._set_fn(fn_bz2, '.bz2')
+ f_bz2.writelines([header, line1, line2])
- # Base Exception so it catches Keyboard Interrupt
- except BaseException as e:
- logging.error(e)
- self.tearDown()
+ yield
- def tearDown(self):
try:
shutil.rmtree(self.tmp_dir)
except OSError as e: