You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/04 10:35:55 UTC

[airflow] branch main updated: Apply flake8-logging-format changes to tests (#24931)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dd06797c10 Apply flake8-logging-format changes to tests (#24931)
dd06797c10 is described below

commit dd06797c1016c48d8681ff218186a30e99c7c5c4
Author: Josh Fell <48...@users.noreply.github.com>
AuthorDate: Thu Aug 4 06:35:45 2022 -0400

    Apply flake8-logging-format changes to tests (#24931)
---
 tests/models/test_dagbag.py                        |   2 +-
 .../apache/hive/transfers/test_s3_to_hive.py       | 104 ++++++++++-----------
 .../google/cloud/utils/gcp_authenticator.py        |  20 +---
 .../task/task_runner/test_standard_task_runner.py  |   2 +-
 tests/utils/test_compression.py                    |  47 ++++------
 5 files changed, 76 insertions(+), 99 deletions(-)

diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index c37cc55b38..7bb371ef9d 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -436,7 +436,7 @@ class TestDagBag:
         actual_found_dag_ids = list(map(lambda dag: dag.dag_id, actual_found_dags))
 
         for dag_id in expected_dag_ids:
-            actual_dagbag.log.info(f'validating {dag_id}')
+            actual_dagbag.log.info('validating %s', dag_id)
             assert (dag_id in actual_found_dag_ids) == should_be_found, (
                 f"dag \"{dag_id}\" should {'' if should_be_found else 'not '}"
                 f"have been found after processing dag \"{expected_parent_dag.dag_id}\""
diff --git a/tests/providers/apache/hive/transfers/test_s3_to_hive.py b/tests/providers/apache/hive/transfers/test_s3_to_hive.py
index 4ca6f39a67..26d7430fdb 100644
--- a/tests/providers/apache/hive/transfers/test_s3_to_hive.py
+++ b/tests/providers/apache/hive/transfers/test_s3_to_hive.py
@@ -21,7 +21,6 @@ import errno
 import filecmp
 import logging
 import shutil
-import unittest
 from collections import OrderedDict
 from gzip import GzipFile
 from itertools import product
@@ -40,8 +39,9 @@ except ImportError:
     mock_s3 = None
 
 
-class TestS3ToHiveTransfer(unittest.TestCase):
-    def setUp(self):
+class TestS3ToHiveTransfer:
+    @pytest.fixture(autouse=True)
+    def setup_attrs(self):
         self.file_names = {}
         self.task_id = 'S3ToHiveTransferTest'
         self.s3_key = 'S32hive_test_file'
@@ -69,49 +69,45 @@ class TestS3ToHiveTransfer(unittest.TestCase):
             'wildcard_match': self.wildcard_match,
             'input_compressed': self.input_compressed,
         }
-        try:
-            header = b"Sno\tSome,Text \n"
-            line1 = b"1\tAirflow Test\n"
-            line2 = b"2\tS32HiveTransfer\n"
-            self.tmp_dir = mkdtemp(prefix='test_tmps32hive_')
-            # create sample txt, gz and bz2 with and without headers
-            with NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt_h:
-                self._set_fn(f_txt_h.name, '.txt', True)
-                f_txt_h.writelines([header, line1, line2])
-            fn_gz = self._get_fn('.txt', True) + ".gz"
-            with GzipFile(filename=fn_gz, mode="wb") as f_gz_h:
-                self._set_fn(fn_gz, '.gz', True)
-                f_gz_h.writelines([header, line1, line2])
-            fn_gz_upper = self._get_fn('.txt', True) + ".GZ"
-            with GzipFile(filename=fn_gz_upper, mode="wb") as f_gz_upper_h:
-                self._set_fn(fn_gz_upper, '.GZ', True)
-                f_gz_upper_h.writelines([header, line1, line2])
-            fn_bz2 = self._get_fn('.txt', True) + '.bz2'
-            with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2_h:
-                self._set_fn(fn_bz2, '.bz2', True)
-                f_bz2_h.writelines([header, line1, line2])
-            # create sample txt, bz and bz2 without header
-            with NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt_nh:
-                self._set_fn(f_txt_nh.name, '.txt', False)
-                f_txt_nh.writelines([line1, line2])
-            fn_gz = self._get_fn('.txt', False) + ".gz"
-            with GzipFile(filename=fn_gz, mode="wb") as f_gz_nh:
-                self._set_fn(fn_gz, '.gz', False)
-                f_gz_nh.writelines([line1, line2])
-            fn_gz_upper = self._get_fn('.txt', False) + ".GZ"
-            with GzipFile(filename=fn_gz_upper, mode="wb") as f_gz_upper_nh:
-                self._set_fn(fn_gz_upper, '.GZ', False)
-                f_gz_upper_nh.writelines([line1, line2])
-            fn_bz2 = self._get_fn('.txt', False) + '.bz2'
-            with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2_nh:
-                self._set_fn(fn_bz2, '.bz2', False)
-                f_bz2_nh.writelines([line1, line2])
-        # Base Exception so it catches Keyboard Interrupt
-        except BaseException as e:
-            logging.error(e)
-            self.tearDown()
+        header = b"Sno\tSome,Text \n"
+        line1 = b"1\tAirflow Test\n"
+        line2 = b"2\tS32HiveTransfer\n"
+        self.tmp_dir = mkdtemp(prefix='test_tmps32hive_')
+        # create sample txt, gz and bz2 with and without headers
+        with NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt_h:
+            self._set_fn(f_txt_h.name, '.txt', True)
+            f_txt_h.writelines([header, line1, line2])
+        fn_gz = self._get_fn('.txt', True) + ".gz"
+        with GzipFile(filename=fn_gz, mode="wb") as f_gz_h:
+            self._set_fn(fn_gz, '.gz', True)
+            f_gz_h.writelines([header, line1, line2])
+        fn_gz_upper = self._get_fn('.txt', True) + ".GZ"
+        with GzipFile(filename=fn_gz_upper, mode="wb") as f_gz_upper_h:
+            self._set_fn(fn_gz_upper, '.GZ', True)
+            f_gz_upper_h.writelines([header, line1, line2])
+        fn_bz2 = self._get_fn('.txt', True) + '.bz2'
+        with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2_h:
+            self._set_fn(fn_bz2, '.bz2', True)
+            f_bz2_h.writelines([header, line1, line2])
+        # create sample txt, bz and bz2 without header
+        with NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt_nh:
+            self._set_fn(f_txt_nh.name, '.txt', False)
+            f_txt_nh.writelines([line1, line2])
+        fn_gz = self._get_fn('.txt', False) + ".gz"
+        with GzipFile(filename=fn_gz, mode="wb") as f_gz_nh:
+            self._set_fn(fn_gz, '.gz', False)
+            f_gz_nh.writelines([line1, line2])
+        fn_gz_upper = self._get_fn('.txt', False) + ".GZ"
+        with GzipFile(filename=fn_gz_upper, mode="wb") as f_gz_upper_nh:
+            self._set_fn(fn_gz_upper, '.GZ', False)
+            f_gz_upper_nh.writelines([line1, line2])
+        fn_bz2 = self._get_fn('.txt', False) + '.bz2'
+        with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2_nh:
+            self._set_fn(fn_bz2, '.bz2', False)
+            f_bz2_nh.writelines([line1, line2])
+
+        yield
 
-    def tearDown(self):
         try:
             shutil.rmtree(self.tmp_dir)
         except OSError as e:
@@ -152,6 +148,11 @@ class TestS3ToHiveTransfer(unittest.TestCase):
         else:
             return filecmp.cmp(fn_1, fn_2, shallow=False)
 
+    @staticmethod
+    def _load_file_side_effect(args, op_fn, ext):
+        check = TestS3ToHiveTransfer._check_file_equality(args[0], op_fn, ext)
+        assert check, f'{ext} output file not as expected'
+
     def test_bad_parameters(self):
         self.kwargs['check_headers'] = True
         self.kwargs['headers'] = False
@@ -194,8 +195,8 @@ class TestS3ToHiveTransfer(unittest.TestCase):
         fn_bz2 = self._get_fn('.bz2', False)
         assert self._check_file_equality(bz2_txt_nh, fn_bz2, '.bz2'), "bz2 Compressed file not as expected"
 
-    @unittest.skipIf(mock is None, 'mock package not present')
-    @unittest.skipIf(mock_s3 is None, 'moto package not present')
+    @pytest.mark.skipif(mock is None, reason='mock package not present')
+    @pytest.mark.skipif(mock_s3 is None, reason='moto package not present')
     @mock.patch('airflow.providers.apache.hive.transfers.s3_to_hive.HiveCliHook')
     @mock_s3
     def test_execute(self, mock_hiveclihook):
@@ -217,16 +218,15 @@ class TestS3ToHiveTransfer(unittest.TestCase):
 
             # file parameter to HiveCliHook.load_file is compared
             # against expected file output
-            mock_hiveclihook().load_file.side_effect = lambda *args, **kwargs: self.assertTrue(
-                self._check_file_equality(args[0], op_fn, ext),
-                f'{ext} output file not as expected',
+            mock_hiveclihook().load_file.side_effect = lambda *args, **kwargs: self._load_file_side_effect(
+                args, op_fn, ext
             )
             # Execute S3ToHiveTransfer
             s32hive = S3ToHiveOperator(**self.kwargs)
             s32hive.execute(None)
 
-    @unittest.skipIf(mock is None, 'mock package not present')
-    @unittest.skipIf(mock_s3 is None, 'moto package not present')
+    @pytest.mark.skipif(mock is None, reason='mock package not present')
+    @pytest.mark.skipif(mock_s3 is None, reason='moto package not present')
     @mock.patch('airflow.providers.apache.hive.transfers.s3_to_hive.HiveCliHook')
     @mock_s3
     def test_execute_with_select_expression(self, mock_hiveclihook):
diff --git a/tests/providers/google/cloud/utils/gcp_authenticator.py b/tests/providers/google/cloud/utils/gcp_authenticator.py
index e5de8ee3d6..044b962a33 100644
--- a/tests/providers/google/cloud/utils/gcp_authenticator.py
+++ b/tests/providers/google/cloud/utils/gcp_authenticator.py
@@ -96,8 +96,7 @@ class GcpAuthenticator(CommandExecutor):
         key path
         :return: None
         """
-        session = settings.Session()
-        try:
+        with settings.Session() as session:
             conn = session.query(Connection).filter(Connection.conn_id == 'google_cloud_default')[0]
             extras = conn.extra_dejson
             extras[KEYPATH_EXTRA] = self.full_key_path
@@ -106,13 +105,6 @@ class GcpAuthenticator(CommandExecutor):
             extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform'
             extras[PROJECT_EXTRA] = self.project_extra if self.project_extra else self.project_id
             conn.extra = json.dumps(extras)
-            session.commit()
-        except BaseException as ex:
-            self.log.error('Airflow DB Session error: %s', str(ex))
-            session.rollback()
-            raise
-        finally:
-            session.close()
 
     def set_dictionary_in_airflow_connection(self):
         """
@@ -120,8 +112,7 @@ class GcpAuthenticator(CommandExecutor):
         of the json service account file.
         :return: None
         """
-        session = settings.Session()
-        try:
+        with settings.Session() as session:
             conn = session.query(Connection).filter(Connection.conn_id == 'google_cloud_default')[0]
             extras = conn.extra_dejson
             with open(self.full_key_path) as path_file:
@@ -132,13 +123,6 @@ class GcpAuthenticator(CommandExecutor):
             extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform'
             extras[PROJECT_EXTRA] = self.project_extra
             conn.extra = json.dumps(extras)
-            session.commit()
-        except BaseException as ex:
-            self.log.error('Airflow DB Session error: %s', str(ex))
-            session.rollback()
-            raise
-        finally:
-            session.close()
 
     def _set_key_path(self):
         """
diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py
index 2ed266a543..a726401a8b 100644
--- a/tests/task/task_runner/test_standard_task_runner.py
+++ b/tests/task/task_runner/test_standard_task_runner.py
@@ -232,7 +232,7 @@ class TestStandardTaskRunner:
                     time.sleep(0.01)
             logging.info("Task started. Give the task some time to settle")
             time.sleep(3)
-            logging.info(f"Terminating processes {processes} belonging to {runner_pgid} group")
+            logging.info("Terminating processes %s belonging to %s group", processes, runner_pgid)
             runner.terminate()
             session.close()  # explicitly close as `create_session`s commit will blow up otherwise
 
diff --git a/tests/utils/test_compression.py b/tests/utils/test_compression.py
index cfa84f5f03..f8e01bda25 100644
--- a/tests/utils/test_compression.py
+++ b/tests/utils/test_compression.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -20,45 +19,39 @@ import bz2
 import errno
 import filecmp
 import gzip
-import logging
 import shutil
 import tempfile
-import unittest
 
 import pytest
 
 from airflow.utils import compression
 
 
-class TestCompression(unittest.TestCase):
-    def setUp(self):
+class TestCompression:
+    @pytest.fixture(autouse=True)
+    def setup_attrs(self):
         self.file_names = {}
-        try:
-            header = b"Sno\tSome,Text \n"
-            line1 = b"1\tAirflow Test\n"
-            line2 = b"2\tCompressionUtil\n"
-            self.tmp_dir = tempfile.mkdtemp(prefix='test_utils_compression_')
-            # create sample txt, gz and bz2 files
-            with tempfile.NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt:
-                self._set_fn(f_txt.name, '.txt')
-                f_txt.writelines([header, line1, line2])
+        header = b"Sno\tSome,Text \n"
+        line1 = b"1\tAirflow Test\n"
+        line2 = b"2\tCompressionUtil\n"
+        self.tmp_dir = tempfile.mkdtemp(prefix='test_utils_compression_')
+        # create sample txt, gz and bz2 files
+        with tempfile.NamedTemporaryFile(mode='wb+', dir=self.tmp_dir, delete=False) as f_txt:
+            self._set_fn(f_txt.name, '.txt')
+            f_txt.writelines([header, line1, line2])
 
-            fn_gz = self._get_fn('.txt') + ".gz"
-            with gzip.GzipFile(filename=fn_gz, mode="wb") as f_gz:
-                self._set_fn(fn_gz, '.gz')
-                f_gz.writelines([header, line1, line2])
+        fn_gz = self._get_fn('.txt') + ".gz"
+        with gzip.GzipFile(filename=fn_gz, mode="wb") as f_gz:
+            self._set_fn(fn_gz, '.gz')
+            f_gz.writelines([header, line1, line2])
 
-            fn_bz2 = self._get_fn('.txt') + '.bz2'
-            with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2:
-                self._set_fn(fn_bz2, '.bz2')
-                f_bz2.writelines([header, line1, line2])
+        fn_bz2 = self._get_fn('.txt') + '.bz2'
+        with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2:
+            self._set_fn(fn_bz2, '.bz2')
+            f_bz2.writelines([header, line1, line2])
 
-        # Base Exception so it catches Keyboard Interrupt
-        except BaseException as e:
-            logging.error(e)
-            self.tearDown()
+        yield
 
-    def tearDown(self):
         try:
             shutil.rmtree(self.tmp_dir)
         except OSError as e: